-
Notifications
You must be signed in to change notification settings - Fork 694
Ensure shared process plugins shutdown cleanly #1309
Ensure shared process plugins shutdown cleanly #1309
Conversation
dcc1799
to
00e66ec
Compare
0ce99c1
to
c9c46b9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally seems fine (although I'm only vaguely familiar with the plugin stuff).
Two things keeping me from 👍 --
- the default
loop
value inwait_for_exit
- the
TODO
left in the code
The rest are suggestions, if you like.
if not isinstance(plugin, BaseAsyncStopPlugin): | ||
continue | ||
|
||
# TODO: This should run concurrently to not waste time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A await asyncio.gather()
seems pretty doable in this PR. If there's some significant complication I'm not seeing, an issue is preferable to a TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that was just me slacking 🌴 ☀️
plugin.set_context(context) | ||
|
||
def shutdown(self) -> None: | ||
def shutdown_nowait(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor naming nitpick
I'm not sure the _nowait
suffix is being used here in quite the same way as other asyncio
library usages. This is potentially a long blocking wait for each plugin to shut down, right? Seems like it's more like a shutdown_blocking
to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that seems like the right name for it 👍
trinity/utils/shutdown.py
Outdated
|
||
|
||
@asynccontextmanager | ||
async def wait_for_exit(loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this default value is going to be problematic. It will be evaluated once at import time, which is probably not be desired. So far all method calls explicitly set the loop, anyway.
How about we require the loop to be passed for now. If leaving it out becomes convenient at some point in the future, we can re-add it. (The default lookup would go in the method body, to detect the loop at runtime).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be evaluated once at import time
Oh, I assumed that would only be evaluated in the case it is called without that parameter. Didn't know these things are evaluated eagerly in Python (Still a Python noob after all).
How about we require the loop to be passed for now
Yes, let's do that.
await service_to_exit.cancel() | ||
if endpoint is not None: | ||
endpoint.stop() | ||
service_to_exit._executor.shutdown(wait=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add another one like:
@asynccontextmanager
async def wait_for_service_exit(service_to_exit: BaseService) -> None:
loop = service_to_exit.get_event_loop()
async with wait_for_exit(loop):
await service_to_exit.cancel()
yield
service_to_exit._executor.shutdown(wait=True)
exit_on_signal
might go away or be pushed elsewhere, into exit_service_with_plugin_endpoint(service, endpoint)
which just stops the endpoint, and simple_service_exit(service)
which does nothing at all in wait_for_service_exit
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went down that way but slightly adjusted naming scheme:
async def exit_with_service_and_endpoint(service_to_exit: BaseService, endpoint: Endpoint) -> None:
async with exit_signal_with_service(service_to_exit):
endpoint.stop()
async def exit_with_service(service_to_exit: BaseService) -> None:
async with exit_signal_with_service(service_to_exit):
pass
@asynccontextmanager
async def exit_signal_with_service(service_to_exit: BaseService) -> AsyncGenerator[None, None]:
loop = service_to_exit.get_event_loop()
async with exit_signal(loop):
await service_to_exit.cancel()
yield
service_to_exit._executor.shutdown(wait=True)
@asynccontextmanager
async def exit_signal(loop: asyncio.AbstractEventLoop) -> AsyncGenerator[None, None]:
sigint_received = asyncio.Event()
for sig in [signal.SIGINT, signal.SIGTERM]:
# TODO also support Windows
loop.add_signal_handler(sig, sigint_received.set)
await sigint_received.wait()
try:
yield
finally:
loop.stop()
I think this reads more natural:
async with exit_signal(loop):
# do something
Same with:
asyncio.ensure_future(exit_with_service_and_endpoint(service, endpoint))
async with wait_for_exit(service.get_event_loop()): | ||
await service.cancel() | ||
await plugin_manager.shutdown() | ||
endpoint.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With a new wait_for_service_exit
this could be:
async with wait_for_service_exit(service):
await plugin_manager.shutdown()
endpoint.stop()
f20b87a
to
d8205fd
Compare
else: | ||
self._logger.info("Successfully stopped plugin: %s", plugin.name) | ||
|
||
def _stop_async_plugins(self) -> Iterable[Awaitable[Optional[Exception]]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed this out into a private method rather than using an inline comprehension for two reasons:
- I wanted to do
self._logger.info("Stopping plugin: %s", plugin.name)
to have the same logs as with sync stops - We need an
isinstance
check to avoid a cast
@carver I think I addressed all your points |
return context | ||
stop_results = await asyncio.gather(*self._stop_async_plugins(), return_exceptions=True) | ||
|
||
for plugin, result in zip(self._started_plugins, stop_results): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't a result for sync plugins in self._started_plugins
, so this will emit the wrong logs below, when a sync plugin comes before an async one.
Maybe add a private method that returns all async plugins, so you can do:
async_plugins = self._get_async_plugins()
stop_results = await asyncio.gather(*self._stop_plugins(async_plugins), return_exceptions=True)
for plugin, result in zip(async_plugins, stop_results):
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, indeed! 🙉👍
d8205fd
to
7979756
Compare
|
||
self._logger.info("Shutting down PluginManager with scope %s", type(self._scope)) | ||
|
||
async_plugins = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@carver doing this inline here to not clutter the class with methods that are only used by this method too much. For a moment I thought we could reuse _stop_plugins()
for both async / sync but since the sync stop()
needs to be surrounded with a try/catch and is considered a success as soon as stop()
returns, that isn't really reusable for the async version. I guess its cleaner to handle these separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I failed to submit this single comment from sometime last week.
|
||
# A plugin that overtakes the main process never gets far enough to even get a context. | ||
# For now it should be safe to just return `None`. Maybe reconsider in the future. | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally consider this an anti-pattern since return value checking is error prone. Don't feel strongly.... but I kind of do.... If there's a good reason why try/except
doesn't fit well with this I'm fine with it.
What was wrong?
Currently, plugins that run in the shared network process do not shutdown cleanly in all scenarios (see #1284). To be more precise, they only shutdown if they happen to be chained with some
CancelToken
which is triggered. This means that for all current plugins that run in the networking process (TxPool
,LightPeerChainBridge
) there isn't a real problem as these will simply shutdown based the fact that e.g thePeerPool
shuts down or theLightPeerChain
shuts down. However, this is still leaving us with two problems:trinity stop-plugin <plugin-name>
How was it fixed?
Understanding the sync vs async scenario
Before we dive into how exactly it was fixed, let's first take a moment to reflect on the three different kind of operational models that we currently have for plugins.
1. Plugins that overtake the whole main process
This category of plugin doesn't need any special tear down treatment as it is basically breaking out of the standard trinity boot sequence early and entirely redefines what the trinity process even means.
2. Plugins that spawn there own process
The teardown for these plugins is similar to how we shut down other non-plugin processes.
py-evm/trinity/extensibility/plugin.py
Lines 180 to 182 in d225f4a
Notice that
kill_process_gracefully
is a blocking operation.3. Plugins that run in a shared process (currently
networking
)Plugins that run in the shared networking process are required to be written in a non-blocking fashion using
asyncio
(This is a MUST for builtin plugins and at least a strong recommendation for any user defined plugin).Means, since these plugins run asynchronously, they need to be stopped asynchronously as well.
Enter
shutdown
vsshutdown_nowait
Initially, I tried to streamline the shutdown of the
PluginManager
to either enforce being:This has proven to be problematic for various reasons (I can elaborate if needed). So what I settled with instead is to come up with
shutdown
andshutdown_nowait
(naming is inline with other places of our code base (and python stdlib) that use that pattern ).Then inside the
kill_trinity_gracefully()
function, we continue to call the synchronous version (only isolated plugins are managed by this plugin manager instance) and in the unwind routine of thenetworking
process we await the async version of shutdown (only async plugins run in this plugin manager instance.Refactorings
I refactored a bunch of stuff around plugins to make this possible:
Because some plugins need no
stop()
at all, some needasync def stop()
and some needdef stop()
, this method was removed from theBasePlugin
and new derived pluginsBaseAsyncStopPlugin
as well asBaseSyncStopPlugin
where defined accordingly.PluginProcessScope
is no more. Otherwise it would be possible to e.g. derive fromBaseAsyncStopPlugin
but setPluginProcessScope.MAIN
which is a contradicting configuration.The
PluginManager
itself was refactored to rely more on composition. It now delegates toscope.is_responsible_for_plugin(...)
andscope.create_plugin_context(...)
instead of hardcoding if/else rules in methods of thePluginManager
directly.A new
AsyncContextManager
(backport from Python 3.7) was created to allow more flexible teardown of event loops compared to what we previously had withexit_on_signal
. Theexit_on_signal
helper does still exist but it is now built on top of saidAsyncContextManager
-based API.Cute Animal Picture