Skip to content

gh-113538: Allow client connections to be closed #114432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Doc/library/asyncio-eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,25 @@ Do not instantiate the :class:`Server` class directly.
coroutine to wait until the server is closed (and no more
connections are active).

.. method:: close_clients()

Close all existing incoming client connections.

Calls :meth:`~asyncio.BaseTransport.close` on all associated
transports.

.. versionadded:: 3.13

.. method:: abort_clients()

Close all existing incoming client connections immediately,
without waiting for pending operations to complete.

Calls :meth:`~asyncio.WriteTransport.abort` on all associated
transports.

.. versionadded:: 3.13

.. method:: get_loop()

Return the event loop associated with the server object.
Expand Down
5 changes: 5 additions & 0 deletions Doc/whatsnew/3.13.rst
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ asyncio
the Unix socket when the server is closed.
(Contributed by Pierre Ossman in :gh:`111246`.)

* Add :meth:`asyncio.Server.close_clients` and
:meth:`asyncio.Server.abort_clients` methods which allows to more
forcefully close an asyncio server.
(Contributed by Pierre Ossman in :gh:`113538`.)

copy
----

Expand Down
24 changes: 16 additions & 8 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
ssl_handshake_timeout, ssl_shutdown_timeout=None):
self._loop = loop
self._sockets = sockets
self._active_count = 0
self._clients = set()
self._waiters = []
self._protocol_factory = protocol_factory
self._backlog = backlog
Expand All @@ -290,14 +290,14 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
def __repr__(self):
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'

def _attach(self):
def _attach(self, transport):
assert self._sockets is not None
self._active_count += 1
self._clients.add(transport)

def _detach(self):
assert self._active_count > 0
self._active_count -= 1
if self._active_count == 0 and self._sockets is None:
def _detach(self, transport):
assert transport in self._clients
self._clients.remove(transport)
if len(self._clients) == 0 and self._sockets is None:
self._wakeup()

def _wakeup(self):
Expand Down Expand Up @@ -346,9 +346,17 @@ def close(self):
self._serving_forever_fut.cancel()
self._serving_forever_fut = None

if self._active_count == 0:
if len(self._clients) == 0:
self._wakeup()

def close_clients(self):
for transport in self._clients.copy():
transport.close()

def abort_clients(self):
for transport in self._clients.copy():
transport.abort()

async def start_serving(self):
self._start_serving()
# Skip one loop iteration so that all 'loop.add_reader'
Expand Down
8 changes: 8 additions & 0 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ def close(self):
"""Stop serving. This leaves existing connections open."""
raise NotImplementedError

def close_clients(self):
"""Close all active connections."""
raise NotImplementedError

def abort_clients(self):
"""Close all active connections immediately."""
raise NotImplementedError

def get_loop(self):
"""Get the event loop the Server object is attached to."""
raise NotImplementedError
Expand Down
4 changes: 2 additions & 2 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, loop, sock, protocol, waiter=None,
self._called_connection_lost = False
self._eof_written = False
if self._server is not None:
self._server._attach()
self._server._attach(self)
self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
Expand Down Expand Up @@ -167,7 +167,7 @@ def _call_connection_lost(self, exc):
self._sock = None
server = self._server
if server is not None:
server._detach()
server._detach(self)
self._server = None
self._called_connection_lost = True

Expand Down
4 changes: 2 additions & 2 deletions Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ def __init__(self, loop, sock, protocol, extra=None, server=None):
self._paused = False # Set when pause_reading() called

if self._server is not None:
self._server._attach()
self._server._attach(self)
loop._transports[self._sock_fd] = self

def __repr__(self):
Expand Down Expand Up @@ -902,7 +902,7 @@ def _call_connection_lost(self, exc):
self._loop = None
server = self._server
if server is not None:
server._detach()
server._detach(self)
self._server = None

def get_write_buffer_size(self):
Expand Down
85 changes: 77 additions & 8 deletions Lib/test/test_asyncio/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,12 @@ async def main(srv):
class TestServer2(unittest.IsolatedAsyncioTestCase):

async def test_wait_closed_basic(self):
async def serve(*args):
pass
async def serve(rd, wr):
try:
await rd.read()
finally:
wr.close()
await wr.wait_closed()

srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
self.addCleanup(srv.close)
Expand All @@ -137,7 +141,8 @@ async def serve(*args):
self.assertFalse(task1.done())

# active count != 0, not closed: should block
srv._attach()
addr = srv.sockets[0].getsockname()
(rd, wr) = await asyncio.open_connection(addr[0], addr[1])
task2 = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0)
self.assertFalse(task1.done())
Expand All @@ -152,7 +157,8 @@ async def serve(*args):
self.assertFalse(task2.done())
self.assertFalse(task3.done())

srv._detach()
wr.close()
await wr.wait_closed()
# active count == 0, closed: should unblock
await task1
await task2
Expand All @@ -161,22 +167,85 @@ async def serve(*args):

async def test_wait_closed_race(self):
# Test a regression in 3.12.0, should be fixed in 3.12.1
async def serve(*args):
pass
async def serve(rd, wr):
try:
await rd.read()
finally:
wr.close()
await wr.wait_closed()

srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
self.addCleanup(srv.close)

task = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0)
self.assertFalse(task.done())
srv._attach()
addr = srv.sockets[0].getsockname()
(rd, wr) = await asyncio.open_connection(addr[0], addr[1])
loop = asyncio.get_running_loop()
loop.call_soon(srv.close)
loop.call_soon(srv._detach)
loop.call_soon(wr.close)
await srv.wait_closed()

async def test_close_clients(self):
async def serve(rd, wr):
try:
await rd.read()
finally:
wr.close()
await wr.wait_closed()

srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
self.addCleanup(srv.close)

addr = srv.sockets[0].getsockname()
(rd, wr) = await asyncio.open_connection(addr[0], addr[1])
self.addCleanup(wr.close)

task = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0)
self.assertFalse(task.done())

srv.close()
srv.close_clients()
await asyncio.sleep(0.1) # FIXME: flush call_soon()?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, short sleeps are a nuisance in asyncio tests. Usually they can be replaced by a small number of sleep(0) calls though -- usually 1, rarely 2 or 3. sleep(0) is special and guarantees we go through the event loop exactly once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bunch of sleep(0) felt even more arbitrary. :/

What's your suggestion here? Keep it as is? Or something else?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bunch of sleep(0) wastes less time (the asyncio tests are already too slow), and doesn't risk the test becoming flaky due to timing out on slow platforms (a problem we've struggled with). The right number of sleep(0) call takes up no more time than needed, and lets the machinery go through its motions in a deterministic matter. So I recommend sleep(0).

self.assertTrue(task.done())

async def test_abort_clients(self):
async def serve(rd, wr):
nonlocal s_rd, s_wr
s_rd = rd
s_wr = wr
await wr.wait_closed()

s_rd = s_wr = None
srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0)
self.addCleanup(srv.close)

addr = srv.sockets[0].getsockname()
(c_rd, c_wr) = await asyncio.open_connection(addr[0], addr[1])
self.addCleanup(c_wr.close)

# Make sure both sides are in a paused state
while (s_wr.transport.get_write_buffer_size() == 0 or
c_wr.transport.is_reading()):
while s_wr.transport.get_write_buffer_size() == 0:
s_wr.write(b'a' * 65536)
await asyncio.sleep(0)
await asyncio.sleep(0.1) # FIXME: More socket buffer space magically appears?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, what's going on with this one? Is it the same issue that can be fixed with a small number of sleep(0) calls, or different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, to be honest. Might be a kernel or libc issue where it shuffles buffers around and/or allocates more space.

Without it, I cannot reliably get both the server and client to a state where buffers are full. Which is needed for the test to check the right thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this PR is not finished until you get to the bottom of that. If you really need then to reach a specific state and there's no deterministic way to get there, consider manipulating internal APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a pain, but I think I got it fixed. The core problem is that the kernel dynamically increases the socket send buffer size. And it takes about 20-30 ms to do so.

I think I've worked around that by specifying an explicit buffer size. That should turn off the dynamic resizing, if I remember things correctly.


task = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0)
self.assertFalse(task.done())

# Sanity check
self.assertNotEqual(s_wr.transport.get_write_buffer_size(), 0)
self.assertFalse(c_wr.transport.is_reading())

srv.close()
srv.abort_clients()
await asyncio.sleep(0.1) # FIXME: flush call_soon()?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one looks similar to the first sleep.

self.assertTrue(task.done())


# Test the various corner cases of Unix server socket removal
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add :meth:`asyncio.Server.close_clients` and
:meth:`asyncio.Server.abort_clients` methods which allows to more forcefully
close an asyncio server.