Skip to content

Commit c9070d0

Browse files
authored
bpo-32662: Implement Server.start_serving() and Server.serve_forever() (#5312)
* bpo-32662: Implement Server.start_serving() and Server.serve_forever() New methods: * Server.start_serving(), * Server.serve_forever(), and * Server.is_serving(). Add 'start_serving' keyword parameter to loop.create_server() and loop.create_unix_server().
1 parent 1aa094f commit c9070d0

File tree

6 files changed

+334
-35
lines changed

6 files changed

+334
-35
lines changed

Doc/library/asyncio-eventloop.rst

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ Creating connections
424424
Creating listening connections
425425
------------------------------
426426

427-
.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None)
427+
.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
428428

429429
Create a TCP server (socket type :data:`~socket.SOCK_STREAM`) bound to
430430
*host* and *port*.
@@ -472,9 +472,15 @@ Creating listening connections
472472
for the SSL handshake to complete before aborting the connection.
473473
``10.0`` seconds if ``None`` (default).
474474

475+
* *start_serving* set to ``True`` (the default) causes the created server
476+
to start accepting connections immediately. When set to ``False``,
477+
the user should await on :meth:`Server.start_serving` or
478+
:meth:`Server.serve_forever` to make the server to start accepting
479+
connections.
480+
475481
.. versionadded:: 3.7
476482

477-
The *ssl_handshake_timeout* parameter.
483+
*ssl_handshake_timeout* and *start_serving* parameters.
478484

479485
.. versionchanged:: 3.5
480486

@@ -490,7 +496,7 @@ Creating listening connections
490496
The *host* parameter can now be a sequence of strings.
491497

492498

493-
.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None)
499+
.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)
494500

495501
Similar to :meth:`AbstractEventLoop.create_server`, but specific to the
496502
socket family :py:data:`~socket.AF_UNIX`.
@@ -929,8 +935,26 @@ Server
929935

930936
Server listening on sockets.
931937

932-
Object created by the :meth:`AbstractEventLoop.create_server` method and the
933-
:func:`start_server` function. Don't instantiate the class directly.
938+
Object created by :meth:`AbstractEventLoop.create_server`,
939+
:meth:`AbstractEventLoop.create_unix_server`, :func:`start_server`,
940+
and :func:`start_unix_server` functions. Don't instantiate the class
941+
directly.
942+
943+
*Server* objects are asynchronous context managers. When used in an
944+
``async with`` statement, it's guaranteed that the Server object is
945+
closed and not accepting new connections when the ``async with``
946+
statement is completed::
947+
948+
srv = await loop.create_server(...)
949+
950+
async with srv:
951+
# some code
952+
953+
# At this point, srv is closed and no longer accepts new connections.
954+
955+
956+
.. versionchanged:: 3.7
957+
Server object is an asynchronous context manager since Python 3.7.
934958

935959
.. method:: close()
936960

@@ -949,6 +973,54 @@ Server
949973

950974
.. versionadded:: 3.7
951975

976+
.. coroutinemethod:: start_serving()
977+
978+
Start accepting connections.
979+
980+
This method is idempotent, so it can be called when
981+
the server is already being serving.
982+
983+
The new *start_serving* keyword-only parameter to
984+
:meth:`AbstractEventLoop.create_server` and
985+
:meth:`asyncio.start_server` allows to create a Server object
986+
that is not accepting connections right away. In which case
987+
this method, or :meth:`Server.serve_forever` can be used
988+
to make the Server object to start accepting connections.
989+
990+
.. versionadded:: 3.7
991+
992+
.. coroutinemethod:: serve_forever()
993+
994+
Start accepting connections until the coroutine is cancelled.
995+
Cancellation of ``serve_forever`` task causes the server
996+
to be closed.
997+
998+
This method can be called if the server is already accepting
999+
connections. Only one ``serve_forever`` task can exist per
1000+
one *Server* object.
1001+
1002+
Example::
1003+
1004+
async def client_connected(reader, writer):
1005+
# Communicate with the client with
1006+
# reader/writer streams. For example:
1007+
await reader.readline()
1008+
1009+
async def main(host, port):
1010+
srv = await asyncio.start_server(
1011+
client_connected, host, port)
1012+
await loop.serve_forever()
1013+
1014+
asyncio.run(main('127.0.0.1', 0))
1015+
1016+
.. versionadded:: 3.7
1017+
1018+
.. method:: is_serving()
1019+
1020+
Return ``True`` if the server is accepting new connections.
1021+
1022+
.. versionadded:: 3.7
1023+
9521024
.. coroutinemethod:: wait_closed()
9531025

9541026
Wait until the :meth:`close` method completes.
@@ -958,6 +1030,11 @@ Server
9581030
List of :class:`socket.socket` objects the server is listening to, or
9591031
``None`` if the server is closed.
9601032

1033+
.. versionchanged:: 3.7
1034+
Prior to Python 3.7 ``Server.sockets`` used to return the
1035+
internal list of server's sockets directly. In 3.7 a copy
1036+
of that list is returned.
1037+
9611038

9621039
Handle
9631040
------

Lib/asyncio/base_events.py

Lines changed: 82 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -157,47 +157,106 @@ def _run_until_complete_cb(fut):
157157

158158
class Server(events.AbstractServer):
159159

160-
def __init__(self, loop, sockets):
160+
def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
161+
ssl_handshake_timeout):
161162
self._loop = loop
162-
self.sockets = sockets
163+
self._sockets = sockets
163164
self._active_count = 0
164165
self._waiters = []
166+
self._protocol_factory = protocol_factory
167+
self._backlog = backlog
168+
self._ssl_context = ssl_context
169+
self._ssl_handshake_timeout = ssl_handshake_timeout
170+
self._serving = False
171+
self._serving_forever_fut = None
165172

166173
def __repr__(self):
167174
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
168175

169176
def _attach(self):
170-
assert self.sockets is not None
177+
assert self._sockets is not None
171178
self._active_count += 1
172179

173180
def _detach(self):
174181
assert self._active_count > 0
175182
self._active_count -= 1
176-
if self._active_count == 0 and self.sockets is None:
183+
if self._active_count == 0 and self._sockets is None:
177184
self._wakeup()
178185

186+
def _wakeup(self):
187+
waiters = self._waiters
188+
self._waiters = None
189+
for waiter in waiters:
190+
if not waiter.done():
191+
waiter.set_result(waiter)
192+
193+
def _start_serving(self):
194+
if self._serving:
195+
return
196+
self._serving = True
197+
for sock in self._sockets:
198+
sock.listen(self._backlog)
199+
self._loop._start_serving(
200+
self._protocol_factory, sock, self._ssl_context,
201+
self, self._backlog, self._ssl_handshake_timeout)
202+
203+
def get_loop(self):
204+
return self._loop
205+
206+
def is_serving(self):
207+
return self._serving
208+
209+
@property
210+
def sockets(self):
211+
if self._sockets is None:
212+
return []
213+
return list(self._sockets)
214+
179215
def close(self):
180-
sockets = self.sockets
216+
sockets = self._sockets
181217
if sockets is None:
182218
return
183-
self.sockets = None
219+
self._sockets = None
220+
184221
for sock in sockets:
185222
self._loop._stop_serving(sock)
223+
224+
self._serving = False
225+
226+
if (self._serving_forever_fut is not None and
227+
not self._serving_forever_fut.done()):
228+
self._serving_forever_fut.cancel()
229+
self._serving_forever_fut = None
230+
186231
if self._active_count == 0:
187232
self._wakeup()
188233

189-
def get_loop(self):
190-
return self._loop
234+
async def start_serving(self):
235+
self._start_serving()
191236

192-
def _wakeup(self):
193-
waiters = self._waiters
194-
self._waiters = None
195-
for waiter in waiters:
196-
if not waiter.done():
197-
waiter.set_result(waiter)
237+
async def serve_forever(self):
238+
if self._serving_forever_fut is not None:
239+
raise RuntimeError(
240+
f'server {self!r} is already being awaited on serve_forever()')
241+
if self._sockets is None:
242+
raise RuntimeError(f'server {self!r} is closed')
243+
244+
self._start_serving()
245+
self._serving_forever_fut = self._loop.create_future()
246+
247+
try:
248+
await self._serving_forever_fut
249+
except futures.CancelledError:
250+
try:
251+
self.close()
252+
await self.wait_closed()
253+
finally:
254+
raise
255+
finally:
256+
self._serving_forever_fut = None
198257

199258
async def wait_closed(self):
200-
if self.sockets is None or self._waiters is None:
259+
if self._sockets is None or self._waiters is None:
201260
return
202261
waiter = self._loop.create_future()
203262
self._waiters.append(waiter)
@@ -1059,7 +1118,8 @@ async def create_server(
10591118
ssl=None,
10601119
reuse_address=None,
10611120
reuse_port=None,
1062-
ssl_handshake_timeout=None):
1121+
ssl_handshake_timeout=None,
1122+
start_serving=True):
10631123
"""Create a TCP server.
10641124
10651125
The host parameter can be a string, in that case the TCP server is
@@ -1149,12 +1209,14 @@ async def create_server(
11491209
raise ValueError(f'A Stream Socket was expected, got {sock!r}')
11501210
sockets = [sock]
11511211

1152-
server = Server(self, sockets)
11531212
for sock in sockets:
1154-
sock.listen(backlog)
11551213
sock.setblocking(False)
1156-
self._start_serving(protocol_factory, sock, ssl, server, backlog,
1157-
ssl_handshake_timeout)
1214+
1215+
server = Server(self, sockets, protocol_factory,
1216+
ssl, backlog, ssl_handshake_timeout)
1217+
if start_serving:
1218+
server._start_serving()
1219+
11581220
if self._debug:
11591221
logger.info("%r is serving", server)
11601222
return server

Lib/asyncio/events.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,39 @@ def close(self):
164164
"""Stop serving. This leaves existing connections open."""
165165
raise NotImplementedError
166166

167+
def get_loop(self):
168+
"""Get the event loop the Server object is attached to."""
169+
raise NotImplementedError
170+
171+
def is_serving(self):
172+
"""Return True if the server is accepting connections."""
173+
raise NotImplementedError
174+
175+
async def start_serving(self):
176+
"""Start accepting connections.
177+
178+
This method is idempotent, so it can be called when
179+
the server is already being serving.
180+
"""
181+
raise NotImplementedError
182+
183+
async def serve_forever(self):
184+
"""Start accepting connections until the coroutine is cancelled.
185+
186+
The server is closed when the coroutine is cancelled.
187+
"""
188+
raise NotImplementedError
189+
167190
async def wait_closed(self):
168191
"""Coroutine to wait until service is closed."""
169192
raise NotImplementedError
170193

171-
def get_loop(self):
172-
""" Get the event loop the Server object is attached to."""
173-
raise NotImplementedError
194+
async def __aenter__(self):
195+
return self
196+
197+
async def __aexit__(self, *exc):
198+
self.close()
199+
await self.wait_closed()
174200

175201

176202
class AbstractEventLoop:
@@ -279,7 +305,8 @@ async def create_server(
279305
*, family=socket.AF_UNSPEC,
280306
flags=socket.AI_PASSIVE, sock=None, backlog=100,
281307
ssl=None, reuse_address=None, reuse_port=None,
282-
ssl_handshake_timeout=None):
308+
ssl_handshake_timeout=None,
309+
start_serving=True):
283310
"""A coroutine which creates a TCP server bound to host and port.
284311
285312
The return value is a Server object which can be used to stop
@@ -319,6 +346,11 @@ async def create_server(
319346
will wait for completion of the SSL handshake before aborting the
320347
connection. Default is 10s, longer timeouts may increase vulnerability
321348
to DoS attacks (see https://support.f5.com/csp/article/K13834)
349+
350+
start_serving set to True (default) causes the created server
351+
to start accepting connections immediately. When set to False,
352+
the user should await Server.start_serving() or Server.serve_forever()
353+
to make the server to start accepting connections.
322354
"""
323355
raise NotImplementedError
324356

@@ -343,7 +375,8 @@ async def create_unix_connection(
343375
async def create_unix_server(
344376
self, protocol_factory, path=None, *,
345377
sock=None, backlog=100, ssl=None,
346-
ssl_handshake_timeout=None):
378+
ssl_handshake_timeout=None,
379+
start_serving=True):
347380
"""A coroutine which creates a UNIX Domain Socket server.
348381
349382
The return value is a Server object, which can be used to stop
@@ -363,6 +396,11 @@ async def create_unix_server(
363396
364397
ssl_handshake_timeout is the time in seconds that an SSL server
365398
will wait for the SSL handshake to complete (defaults to 10s).
399+
400+
start_serving set to True (default) causes the created server
401+
to start accepting connections immediately. When set to False,
402+
the user should await Server.start_serving() or Server.serve_forever()
403+
to make the server to start accepting connections.
366404
"""
367405
raise NotImplementedError
368406

Lib/asyncio/unix_events.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,8 @@ async def create_unix_connection(
250250
async def create_unix_server(
251251
self, protocol_factory, path=None, *,
252252
sock=None, backlog=100, ssl=None,
253-
ssl_handshake_timeout=None):
253+
ssl_handshake_timeout=None,
254+
start_serving=True):
254255
if isinstance(ssl, bool):
255256
raise TypeError('ssl argument must be an SSLContext or None')
256257

@@ -302,11 +303,12 @@ async def create_unix_server(
302303
raise ValueError(
303304
f'A UNIX Domain Stream Socket was expected, got {sock!r}')
304305

305-
server = base_events.Server(self, [sock])
306-
sock.listen(backlog)
307306
sock.setblocking(False)
308-
self._start_serving(protocol_factory, sock, ssl, server,
309-
ssl_handshake_timeout=ssl_handshake_timeout)
307+
server = base_events.Server(self, [sock], protocol_factory,
308+
ssl, backlog, ssl_handshake_timeout)
309+
if start_serving:
310+
server._start_serving()
311+
310312
return server
311313

312314
async def _sock_sendfile_native(self, sock, file, offset, count):

0 commit comments

Comments
 (0)