Skip to content

Commit 755441b

Browse files
authored
Fix no currently running Dispatcher is found issue (#715)
* Fix no currently running Dispatcher is found issue * Address unittest scenarios * Log to stderr with console prefix when AsyncLoggingHandler fails * Fix syntax issue
1 parent 5d21aca commit 755441b

File tree

8 files changed

+265
-32
lines changed

8 files changed

+265
-32
lines changed

azure_functions_worker/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
3+
4+
# Prefixes
5+
CONSOLE_LOG_PREFIX = "LanguageWorkerConsoleLog"
6+
37
# Capabilities
48
RAW_HTTP_BODY_BYTES = "RawHttpBodyBytes"
59
TYPED_DATA_COLLECTION = "TypedDataCollection"

azure_functions_worker/dispatcher.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@
2323
from . import protos
2424
from . import constants
2525

26+
from .constants import CONSOLE_LOG_PREFIX
2627
from .logging import error_logger, logger, is_system_log_category
2728
from .logging import enable_console_logging, disable_console_logging
2829
from .utils.tracing import marshall_exception_trace
2930
from .utils.wrappers import disable_feature_by
30-
from asyncio.unix_events import _UnixSelectorEventLoop
31+
from asyncio import BaseEventLoop
3132
from logging import LogRecord
3233
from typing import Optional
3334

@@ -48,7 +49,7 @@ class Dispatcher(metaclass=DispatcherMeta):
4849

4950
_GRPC_STOP_RESPONSE = object()
5051

51-
def __init__(self, loop: _UnixSelectorEventLoop, host: str, port: int,
52+
def __init__(self, loop: BaseEventLoop, host: str, port: int,
5253
worker_id: str, request_id: str,
5354
grpc_connect_timeout: float,
5455
grpc_max_msg_len: int = -1) -> None:
@@ -113,22 +114,29 @@ async def dispatch_forever(self):
113114
self._loop.set_task_factory(
114115
lambda loop, coro: ContextEnabledTask(coro, loop=loop))
115116

116-
# Attach gRPC logging to the root logger
117+
# Detach console logging before enabling GRPC channel logging
118+
logger.info('Detaching console logging.')
119+
disable_console_logging()
120+
121+
# Attach gRPC logging to the root logger. Since gRPC channel is
122+
# established, should use it for system and user logs
117123
logging_handler = AsyncLoggingHandler()
118124
root_logger = logging.getLogger()
119125
root_logger.setLevel(logging.INFO)
120126
root_logger.addHandler(logging_handler)
121-
122-
# Since gRPC channel is established, should use it for logging
123-
disable_console_logging()
124-
logger.info('Detach console logging. Switch to gRPC logging')
127+
logger.info('Switched to gRPC logging.')
128+
logging_handler.flush()
125129

126130
try:
127131
await forever
128132
finally:
129-
# Re-enable console logging when there's an exception
130-
enable_console_logging()
133+
logger.warn('Detaching gRPC logging due to exception.')
134+
logging_handler.flush()
131135
root_logger.removeHandler(logging_handler)
136+
137+
# Reenable console logging when there's an exception
138+
enable_console_logging()
139+
logger.warn('Switched to console logging due to exception.')
132140
finally:
133141
DispatcherMeta.__current_dispatcher__ = None
134142

@@ -163,7 +171,7 @@ def on_logging(self, record: logging.LogRecord, formatted_msg: str) -> None:
163171

164172
if is_system_log_category(record.name):
165173
log_category = protos.RpcLog.RpcLogCategory.System
166-
else:
174+
else: # customers using logging will yield 'root' in record.name
167175
log_category = protos.RpcLog.RpcLogCategory.User
168176

169177
log = dict(
@@ -365,6 +373,9 @@ async def _handle__invocation_request(self, req):
365373
fi.return_type.binding_name, call_result,
366374
pytype=fi.return_type.pytype)
367375

376+
# Actively flush customer print() function to console
377+
sys.stdout.flush()
378+
368379
logger.info('Successfully processed FunctionInvocationRequest, '
369380
'request ID: %s, function ID: %s, invocation ID: %s',
370381
self.request_id, function_id, invocation_id)
@@ -518,10 +529,23 @@ def gen(resp_queue):
518529
class AsyncLoggingHandler(logging.Handler):
519530

520531
def emit(self, record: LogRecord) -> None:
521-
# Since we disable console log after gRPC channel is initiated
522-
# We should redirect all the messages into dispatcher
532+
# Since we disable console log after gRPC channel is initiated,
533+
# we should redirect all the messages into dispatcher.
534+
535+
# When dispatcher receives an exception, it should switch back
536+
# to console logging. However, it is possible that
537+
# __current_dispatcher__ is set to None as there are still messages
538+
# buffered in this handler, not calling the emit yet.
523539
msg = self.format(record)
524-
Dispatcher.current.on_logging(record, msg)
540+
try:
541+
Dispatcher.current.on_logging(record, msg)
542+
except RuntimeError as runtime_error:
543+
# This will cause 'Dispatcher not found' failure.
544+
# Logging such of an issue will cause infinite loop of gRPC logging
545+
# To mitigate, we should suppress the 2nd level error logging here
546+
# and use print function to report exception instead.
547+
print(f'{CONSOLE_LOG_PREFIX} ERROR: {str(runtime_error)}',
548+
file=sys.stderr, flush=True)
525549

526550

527551
class ContextEnabledTask(asyncio.Task):

azure_functions_worker/logging.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,33 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
3+
4+
from typing import Optional
35
import logging
46
import logging.handlers
57
import sys
68

9+
from .constants import CONSOLE_LOG_PREFIX
10+
711

8-
logger = logging.getLogger('azure_functions_worker')
9-
error_logger = logging.getLogger('azure_functions_worker_errors')
12+
logger: logging.Logger = logging.getLogger('azure_functions_worker')
13+
error_logger: logging.Logger = (
14+
logging.getLogger('azure_functions_worker_errors'))
1015

11-
handler = None
12-
error_handler = None
16+
handler: Optional[logging.Handler] = None
17+
error_handler: Optional[logging.Handler] = None
1318

1419

1520
def setup(log_level, log_destination):
21+
# Since handler and error_handler are moved to the global scope,
22+
# before assigning to these handlers, we should define 'global' keyword
23+
global handler
24+
global error_handler
25+
1626
if log_level == 'TRACE':
1727
log_level = 'DEBUG'
1828

19-
formatter = logging.Formatter(
20-
'LanguageWorkerConsoleLog %(levelname)s: %(message)s')
29+
formatter = logging.Formatter(f'{CONSOLE_LOG_PREFIX}'
30+
' %(levelname)s: %(message)s')
2131

2232
if log_destination is None:
2333
# With no explicit log destination we do split logging,
@@ -51,25 +61,19 @@ def setup(log_level, log_destination):
5161

5262

5363
def disable_console_logging() -> None:
64+
# We should only remove the sys.stdout stream, as error_logger is used for
65+
# unexpected critical error logs handling.
5466
if logger and handler:
67+
handler.flush()
5568
logger.removeHandler(handler)
5669

57-
if error_logger and error_handler:
58-
error_logger.removeHandler(error_handler)
59-
6070

6171
def enable_console_logging() -> None:
6272
if logger and handler:
6373
logger.addHandler(handler)
6474

65-
if error_logger and error_handler:
66-
error_logger.addHandler(error_handler)
67-
6875

6976
def is_system_log_category(ctg: str) -> bool:
70-
return any(
71-
[ctg.lower().startswith(c) for c in (
72-
'azure_functions_worker',
73-
'azure_functions_worker_errors'
74-
)]
75-
)
77+
# Category starts with 'azure_functions_worker' or
78+
# 'azure_functions_worker_errors' will be treated as system logs
79+
return ctg.lower().startswith('azure_functions_worker')
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"scriptFile": "main.py",
3+
"bindings": [
4+
{
5+
"authLevel": "anonymous",
6+
"type": "httpTrigger",
7+
"direction": "in",
8+
"name": "req",
9+
"methods": [
10+
"get"
11+
]
12+
},
13+
{
14+
"type": "http",
15+
"direction": "out",
16+
"name": "$return"
17+
}
18+
]
19+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import sys
4+
import logging
5+
import asyncio
6+
7+
import azure.functions as func
8+
9+
10+
logger = logging.getLogger('custom_logger')
11+
12+
# Attempt to log info into system log from customer code
13+
disguised_logger = logging.getLogger('azure_functions_worker')
14+
15+
16+
async def parallelly_print():
17+
await asyncio.sleep(0.1)
18+
print('parallelly_print')
19+
20+
21+
async def parallelly_log_info():
22+
await asyncio.sleep(0.2)
23+
logging.info('parallelly_log_info at root logger')
24+
25+
26+
async def parallelly_log_warning():
27+
await asyncio.sleep(0.3)
28+
logging.warning('parallelly_log_warning at root logger')
29+
30+
31+
async def parallelly_log_error():
32+
await asyncio.sleep(0.4)
33+
logging.error('parallelly_log_error at root logger')
34+
35+
36+
async def parallelly_log_exception():
37+
await asyncio.sleep(0.5)
38+
try:
39+
raise Exception('custom exception')
40+
except Exception:
41+
logging.exception('parallelly_log_exception at root logger',
42+
exc_info=sys.exc_info())
43+
44+
45+
async def parallelly_log_custom():
46+
await asyncio.sleep(0.6)
47+
logger.info('parallelly_log_custom at custom_logger')
48+
49+
50+
async def parallelly_log_system():
51+
await asyncio.sleep(0.7)
52+
disguised_logger.info('parallelly_log_system at disguised_logger')
53+
54+
55+
async def main(req: func.HttpRequest) -> func.HttpResponse:
56+
loop = asyncio.get_event_loop()
57+
58+
# Create multiple tasks and schedule it into one asyncio.wait blocker
59+
task_print: asyncio.Task = loop.create_task(parallelly_print())
60+
task_info: asyncio.Task = loop.create_task(parallelly_log_info())
61+
task_warning: asyncio.Task = loop.create_task(parallelly_log_warning())
62+
task_error: asyncio.Task = loop.create_task(parallelly_log_error())
63+
task_exception: asyncio.Task = loop.create_task(parallelly_log_exception())
64+
task_custom: asyncio.Task = loop.create_task(parallelly_log_custom())
65+
task_disguise: asyncio.Task = loop.create_task(parallelly_log_system())
66+
67+
# Create an awaitable future and occupy the current event loop resource
68+
future = loop.create_future()
69+
loop.call_soon_threadsafe(future.set_result, 'callsoon_log')
70+
71+
# WaitAll
72+
await asyncio.wait([task_print, task_info, task_warning, task_error,
73+
task_exception, task_custom, task_disguise, future])
74+
75+
# Log asyncio low-level future result
76+
logging.info(future.result())
77+
78+
return 'OK-hijack-current-event-loop'
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"scriptFile": "main.py",
3+
"bindings": [
4+
{
5+
"type": "httpTrigger",
6+
"direction": "in",
7+
"name": "req"
8+
},
9+
{
10+
"type": "http",
11+
"direction": "out",
12+
"name": "$return"
13+
}
14+
]
15+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
import sys
5+
import azure.functions
6+
7+
8+
def main(req: azure.functions.HttpRequest):
9+
flush_required = False
10+
is_console_log = False
11+
is_stderr = False
12+
message = req.params.get('message', '')
13+
14+
if req.params.get('flush') == 'true':
15+
flush_required = True
16+
if req.params.get('console') == 'true':
17+
is_console_log = True
18+
if req.params.get('is_stderr') == 'true':
19+
is_stderr = True
20+
21+
# Adding LanguageWorkerConsoleLog will make function host to treat
22+
# this as system log and will be propagated to kusto
23+
prefix = 'LanguageWorkerConsoleLog' if is_console_log else ''
24+
print(f'{prefix} {message}'.strip(),
25+
file=sys.stderr if is_stderr else sys.stdout,
26+
flush=flush_required)
27+
28+
return 'OK-print-logging'

0 commit comments

Comments
 (0)