From a2dc98a35ede7d98f776bead1f1f4677e5608ae6 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Wed, 19 Apr 2023 12:47:07 +0300 Subject: [PATCH 01/13] gateway_base: remove _setupmessages() to allow for static analysis --- execnet/gateway_base.py | 85 ++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py index fd3c7f63..3c3f1533 100644 --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -388,7 +388,8 @@ def close_write(self): class Message: """encapsulates Messages and their wire protocol.""" - _types: list[Callable[[Message, BaseGateway], None]] = [] + # message code -> name, handler + _types: dict[int, tuple[str, Callable[[Message, BaseGateway], None]]] = {} def __init__(self, msgcode, channelid=0, data=""): self.msgcode = msgcode @@ -412,21 +413,16 @@ def to_io(self, io): io.write(header + self.data) def received(self, gateway): - self._types[self.msgcode](self, gateway) + handler = self._types[self.msgcode][1] + handler(self, gateway) def __repr__(self): - name = self._types[self.msgcode].__name__.upper() + name = self._types[self.msgcode][0] return "".format( name, self.channelid, len(self.data) ) - -class GatewayReceivedTerminate(Exception): - """Receiverthread got termination message.""" - - -def _setupmessages(): - def status(message, gateway): + def _status(message, gateway): # we use the channelid to send back information # but don't instantiate a channel object d = { @@ -437,49 +433,60 @@ def status(message, gateway): gateway._send(Message.CHANNEL_DATA, message.channelid, dumps_internal(d)) gateway._send(Message.CHANNEL_CLOSE, message.channelid) - def channel_exec(message, gateway): + STATUS = 0 + _types[STATUS] = ("STATUS", _status) + + def _reconfigure(message, gateway): + if message.channelid == 0: + target = gateway + else: + target = gateway._channelfactory.new(message.channelid) + target._strconfig = loads_internal(message.data, gateway) + + RECONFIGURE = 1 + _types[RECONFIGURE] = ("RECONFIGURE", _reconfigure) + + def _gateway_terminate(message, gateway): + raise GatewayReceivedTerminate(gateway) + + GATEWAY_TERMINATE = 2 + _types[GATEWAY_TERMINATE] = ("GATEWAY_TERMINATE", _gateway_terminate) + + def _channel_exec(message, gateway): channel = gateway._channelfactory.new(message.channelid) gateway._local_schedulexec(channel=channel, sourcetask=message.data) - def channel_data(message, gateway): + CHANNEL_EXEC = 3 + _types[CHANNEL_EXEC] = ("CHANNEL_EXEC", _channel_exec) + + def _channel_data(message, gateway): gateway._channelfactory._local_receive(message.channelid, message.data) - def channel_close(message, gateway): + CHANNEL_DATA = 4 + _types[CHANNEL_DATA] = ("CHANNEL_DATA", _channel_data) + + def _channel_close(message, gateway): gateway._channelfactory._local_close(message.channelid) - def channel_close_error(message, gateway): + CHANNEL_CLOSE = 5 + _types[CHANNEL_CLOSE] = ("CHANNEL_CLOSE", _channel_close) + + def _channel_close_error(message, gateway): remote_error = RemoteError(loads_internal(message.data)) gateway._channelfactory._local_close(message.channelid, remote_error) - def channel_last_message(message, gateway): + CHANNEL_CLOSE_ERROR = 6 + _types[CHANNEL_CLOSE_ERROR] = ("CHANNEL_CLOSE_ERROR", _channel_close_error) + + def _channel_last_message(message, gateway): gateway._channelfactory._local_close(message.channelid, sendonly=True) - def gateway_terminate(message, gateway): - raise GatewayReceivedTerminate(gateway) + CHANNEL_LAST_MESSAGE = 7 + _types[CHANNEL_LAST_MESSAGE] = ("CHANNEL_LAST_MESSAGE", _channel_last_message) - def reconfigure(message, gateway): - if message.channelid == 0: - target = gateway - else: - target = gateway._channelfactory.new(message.channelid) - target._strconfig = loads_internal(message.data, gateway) - types = [ - status, - reconfigure, - gateway_terminate, - channel_exec, - channel_data, - channel_close, - channel_close_error, - channel_last_message, - ] - for i, handler in enumerate(types): - Message._types.append(handler) - setattr(Message, handler.__name__.upper(), i) - - -_setupmessages() +class GatewayReceivedTerminate(Exception): + """Receiverthread got termination message.""" def geterrortext(excinfo, format_exception=traceback.format_exception, sysex=sysex): From 9324d193799b33dc9372e47c9e2ac9dac3a26c55 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Wed, 19 Apr 2023 18:34:30 +0300 Subject: [PATCH 02/13] gateway_base: remove _buildopcodes() to allow for static analysis --- execnet/gateway_base.py | 94 +++++++++++++++++++++++++++++------------ 1 file changed, 66 insertions(+), 28 deletions(-) diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py index 3c3f1533..1852536f 100644 --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -1107,10 +1107,34 @@ class _Stop(Exception): pass +class opcode: + """container for name -> num mappings.""" + + BUILDTUPLE = b"@" + BYTES = b"A" + CHANNEL = b"B" + FALSE = b"C" + FLOAT = b"D" + FROZENSET = b"E" + INT = b"F" + LONG = b"G" + LONGINT = b"H" + LONGLONG = b"I" + NEWDICT = b"J" + NEWLIST = b"K" + NONE = b"L" + PY2STRING = b"M" + PY3STRING = b"N" + SET = b"O" + SETITEM = b"P" + STOP = b"Q" + TRUE = b"R" + UNICODE = b"S" + COMPLEX = b"T" + + class Unserializer: - num2func: dict[ - int, Callable[[Unserializer], None] - ] = {} # is filled after this class definition + num2func: dict[bytes, Callable[[Unserializer], None]] = {} py2str_as_py3str = True # True py3str_as_py2str = False # false means py2 will get unicode @@ -1150,31 +1174,47 @@ def load(self, versioned=False): def load_none(self): self.stack.append(None) + num2func[opcode.NONE] = load_none + def load_true(self): self.stack.append(True) + num2func[opcode.TRUE] = load_true + def load_false(self): self.stack.append(False) + num2func[opcode.FALSE] = load_false + def load_int(self): i = self._read_int4() self.stack.append(i) + num2func[opcode.INT] = load_int + def load_longint(self): s = self._read_byte_string() self.stack.append(int(s)) + num2func[opcode.LONGINT] = load_longint + load_long = load_int + num2func[opcode.LONG] = load_long load_longlong = load_longint + num2func[opcode.LONGLONG] = load_longlong def load_float(self): binary = self.stream.read(FLOAT_FORMAT_SIZE) self.stack.append(struct.unpack(FLOAT_FORMAT, binary)[0]) + num2func[opcode.FLOAT] = load_float + def load_complex(self): binary = self.stream.read(COMPLEX_FORMAT_SIZE) self.stack.append(complex(*struct.unpack(COMPLEX_FORMAT, binary))) + num2func[opcode.COMPLEX] = load_complex + def _read_int4(self): return struct.unpack("!i", self.stream.read(4))[0] @@ -1191,6 +1231,8 @@ def load_py3string(self): else: self.stack.append(as_bytes.decode("utf-8")) + num2func[opcode.PY3STRING] = load_py3string + def load_py2string(self): as_bytes = self._read_byte_string() if self.py2str_as_py3str: @@ -1199,17 +1241,25 @@ def load_py2string(self): s = as_bytes self.stack.append(s) + num2func[opcode.PY2STRING] = load_py2string + def load_bytes(self): s = self._read_byte_string() self.stack.append(s) + num2func[opcode.BYTES] = load_bytes + def load_unicode(self): self.stack.append(self._read_byte_string().decode("utf-8")) + num2func[opcode.UNICODE] = load_unicode + def load_newlist(self): length = self._read_int4() self.stack.append([None] * length) + num2func[opcode.NEWLIST] = load_newlist + def load_setitem(self): if len(self.stack) < 3: raise LoadError("not enough items for setitem") @@ -1217,9 +1267,13 @@ def load_setitem(self): key = self.stack.pop() self.stack[-1][key] = value + num2func[opcode.SETITEM] = load_setitem + def load_newdict(self): self.stack.append({}) + num2func[opcode.NEWDICT] = load_newdict + def _load_collection(self, type_): length = self._read_int4() if length: @@ -1232,45 +1286,29 @@ def _load_collection(self, type_): def load_buildtuple(self): self._load_collection(tuple) + num2func[opcode.BUILDTUPLE] = load_buildtuple + def load_set(self): self._load_collection(set) + num2func[opcode.SET] = load_set + def load_frozenset(self): self._load_collection(frozenset) + num2func[opcode.FROZENSET] = load_frozenset + def load_stop(self): raise _Stop + num2func[opcode.STOP] = load_stop + def load_channel(self): id = self._read_int4() newchannel = self.channelfactory.new(id) self.stack.append(newchannel) - -# automatically build opcodes and byte-encoding - - -class opcode: - """container for name -> num mappings.""" - - -def _buildopcodes(): - l = [] - later_added = {"COMPLEX": 1} - for name, func in Unserializer.__dict__.items(): - if name.startswith("load_"): - opname = name[5:].upper() - l.append((opname, func)) - l.sort(key=lambda x: (later_added.get(x[0], 0), x[0])) - - for i, (opname, func) in enumerate(l): - assert i < 26, "xxx" - i = bchr(64 + i) - Unserializer.num2func[i] = func - setattr(opcode, opname, i) - - -_buildopcodes() + num2func[opcode.CHANNEL] = load_channel def dumps(obj): From 25c9b1d0db43b7c872ac33c6b5d678dbb9473925 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Wed, 19 Apr 2023 00:12:39 +0300 Subject: [PATCH 03/13] Remove various no longer needed compat code --- execnet/gateway_base.py | 20 +++++--------------- execnet/gateway_socket.py | 5 ----- execnet/multi.py | 3 +-- 3 files changed, 6 insertions(+), 22 deletions(-) diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py index 1852536f..d3ab0a45 100644 --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -18,18 +18,11 @@ import sys import traceback import weakref +from _thread import interrupt_main from io import BytesIO from typing import Callable -def reraise(cls, val, tb): - raise val.with_traceback(tb) - - -unicode = str -_long_type = int -from _thread import interrupt_main - SUBPROCESS32 = False # f = open("/tmp/execnet-%s" % os.getpid(), "w") # def log_extra(*msg): @@ -45,11 +38,8 @@ def get_execmodel(backend): return backend if backend == "thread": importdef = { - "get_ident": ["thread::get_ident", "_thread::get_ident"], - "_start_new_thread": [ - "thread::start_new_thread", - "_thread::start_new_thread", - ], + "get_ident": ["_thread::get_ident"], + "_start_new_thread": ["_thread::start_new_thread"], "threading": ["threading"], "queue": ["queue"], "sleep": ["time::sleep"], @@ -178,7 +168,7 @@ def get(self, timeout=None): try: return self._result except AttributeError: - reraise(*(self._excinfo[:3])) # noqa + raise self._excinfo[1].with_traceback(self._excinfo[2]) def waitfinish(self, timeout=None): if not self._result_ready.wait(timeout): @@ -1385,7 +1375,7 @@ def save(self, obj, versioned=False): streamlist = self._streamlist except AttributeError: return None - return type(streamlist[0])().join(streamlist) + return b"".join(streamlist) def _save(self, obj): tp = type(obj) diff --git a/execnet/gateway_socket.py b/execnet/gateway_socket.py index 175112bc..4379e015 100644 --- a/execnet/gateway_socket.py +++ b/execnet/gateway_socket.py @@ -2,11 +2,6 @@ from execnet.gateway_bootstrap import HostNotFound -try: - bytes -except NameError: - bytes = str - class SocketIO: def __init__(self, sock, execmodel): diff --git a/execnet/multi.py b/execnet/multi.py index 838fd546..f7481cd0 100644 --- a/execnet/multi.py +++ b/execnet/multi.py @@ -11,7 +11,6 @@ from . import gateway_bootstrap from . import gateway_io from .gateway_base import get_execmodel -from .gateway_base import reraise from .gateway_base import trace from .xspec import XSpec @@ -289,7 +288,7 @@ def waitclose(self): if first is None: first = sys.exc_info() if first: - reraise(*first) + raise first[1].with_traceback(first[2]) def safe_terminate(execmodel, timeout, list_of_paired_functions): From e846c64a3f5da604c5a5b846e794b7cc6719b0c3 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Wed, 19 Apr 2023 00:40:59 +0300 Subject: [PATCH 04/13] gateway_base: fix incorrect type of Message(data) default --- execnet/gateway_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py index d3ab0a45..e58ffe5f 100644 --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -381,7 +381,7 @@ class Message: # message code -> name, handler _types: dict[int, tuple[str, Callable[[Message, BaseGateway], None]]] = {} - def __init__(self, msgcode, channelid=0, data=""): + def __init__(self, msgcode, channelid=0, data=b""): self.msgcode = msgcode self.channelid = channelid self.data = data From da86c5d6a5a7f123b135153b1f4d4b2b6f374474 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Wed, 19 Apr 2023 00:44:21 +0300 Subject: [PATCH 05/13] Replace bare `except`s with `except BaseException` --- execnet/gateway_base.py | 6 +++--- execnet/script/shell.py | 4 ++-- execnet/script/socketserver.py | 2 +- execnet/script/xx.py | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py index e58ffe5f..c872e547 100644 --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -179,7 +179,7 @@ def run(self): try: try: self._result = func(*args, **kwargs) - except: + except BaseException: # sys may be already None when shutting down the interpreter if sys is not None: self._excinfo = sys.exc_info() @@ -485,7 +485,7 @@ def geterrortext(excinfo, format_exception=traceback.format_exception, sysex=sys errortext = "".join(l) except sysex: raise - except: + except BaseException: errortext = f"{excinfo[0].__name__}: {excinfo[1]}" return errortext @@ -1050,7 +1050,7 @@ def executetask(self, item): except KeyboardInterrupt: channel.close(INTERRUPT_TEXT) raise - except: + except BaseException: excinfo = self.exc_info() if not isinstance(excinfo[1], EOFError): if not channel.gateway._channelfactory.finished: diff --git a/execnet/script/shell.py b/execnet/script/shell.py index 4719b90a..a221756d 100755 --- a/execnet/script/shell.py +++ b/execnet/script/shell.py @@ -32,7 +32,7 @@ def clientside(): line = sock.recv(4096) sys.stdout.write(line) sys.stdout.flush() - except: + except BaseException: import traceback print(traceback.print_exc()) @@ -66,7 +66,7 @@ def run(self): try: try: exec(compile(line + "\n", "", "single")) - except: + except BaseException: print_exc() finally: sys.stdout = oldout diff --git a/execnet/script/socketserver.py b/execnet/script/socketserver.py index 6180bcd4..034b4346 100755 --- a/execnet/script/socketserver.py +++ b/execnet/script/socketserver.py @@ -94,7 +94,7 @@ def startserver(serversock, loop=False): exec_from_one_connection(serversock) except (KeyboardInterrupt, SystemExit): raise - except: + except BaseException: if debug: import traceback diff --git a/execnet/script/xx.py b/execnet/script/xx.py index fd514c13..687cc81e 100644 --- a/execnet/script/xx.py +++ b/execnet/script/xx.py @@ -7,6 +7,6 @@ try: hostport = sys.argv[1] -except: +except BaseException: hostport = ":8888" gw = register.ServerGateway(hostport) From a2aeb41bb984279b85ef5e2fc3c04e9619a263e8 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Wed, 19 Apr 2023 19:07:46 +0300 Subject: [PATCH 06/13] Use super().__init__() --- execnet/gateway_base.py | 2 +- execnet/gateway_io.py | 2 +- execnet/script/shell.py | 2 +- execnet/script/socketserverservice.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py index c872e547..69ca8051 100644 --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -494,8 +494,8 @@ class RemoteError(Exception): """Exception containing a stringified error from the other side.""" def __init__(self, formatted): + super().__init__() self.formatted = formatted - Exception.__init__(self) def __str__(self): return self.formatted diff --git a/execnet/gateway_io.py b/execnet/gateway_io.py index 84352655..ea0fc92d 100644 --- a/execnet/gateway_io.py +++ b/execnet/gateway_io.py @@ -18,7 +18,7 @@ class Popen2IOMaster(Popen2IO): def __init__(self, args, execmodel): self.popen = p = execmodel.PopenPiped(args) - Popen2IO.__init__(self, p.stdin, p.stdout, execmodel=execmodel) + super().__init__(p.stdin, p.stdout, execmodel=execmodel) def wait(self): try: diff --git a/execnet/script/shell.py b/execnet/script/shell.py index a221756d..f47cd4d3 100755 --- a/execnet/script/shell.py +++ b/execnet/script/shell.py @@ -43,7 +43,7 @@ def clientside(): class promptagent(Thread): def __init__(self, clientsock): print("server side starting") - Thread.__init__(self) + super.__init__() self.clientsock = clientsock def run(self): diff --git a/execnet/script/socketserverservice.py b/execnet/script/socketserverservice.py index c85f00fb..3d64f139 100644 --- a/execnet/script/socketserverservice.py +++ b/execnet/script/socketserverservice.py @@ -36,7 +36,7 @@ def __init__(self, args): win32evtlogutil.AddSourceToRegistry( self._svc_display_name_, servicemanager.__file__, "Application" ) - win32serviceutil.ServiceFramework.__init__(self, args) + super.__init__(args) self.hWaitStop = win32event.CreateEvent(None, 0, 0, None) self.WAIT_TIME = 1000 # in milliseconds From 9572fa6053529a6e74e0ea1b38a1b0e40b862bc5 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Wed, 19 Apr 2023 01:11:36 +0300 Subject: [PATCH 07/13] multi: fix typo in assert The `id` here referred to the builtin `id` function, that's not the intention. --- execnet/multi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/execnet/multi.py b/execnet/multi.py index f7481cd0..28489dbd 100644 --- a/execnet/multi.py +++ b/execnet/multi.py @@ -173,7 +173,7 @@ def allocate_id(self, spec): def _register(self, gateway): assert not hasattr(gateway, "_group") assert gateway.id - assert id not in self + assert gateway.id not in self self._gateways.append(gateway) gateway._group = self From 39e2ba903a29e7fbf8e38b62d2a9a6ab7f163106 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Wed, 19 Apr 2023 18:48:53 +0300 Subject: [PATCH 08/13] gateway_base: fix typing typo --- execnet/gateway_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py index 69ca8051..67011f53 100644 --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -1355,7 +1355,7 @@ def dumps_internal(obj): class _Serializer: - _dispatch = {object: Callable[["_Serializer", object], None]} + _dispatch: dict[type, Callable[[_Serializer, object], None]] = {} def __init__(self, write=None): if write is None: From 7ecb41fec3fc473d44d7380ed1e32ac2f320afac Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Thu, 20 Apr 2023 23:54:01 +0300 Subject: [PATCH 09/13] gateway_base: remove on longer needed importdef fallbacks --- execnet/gateway_base.py | 86 ++++++++++++++++++++--------------------- 1 file changed, 42 insertions(+), 44 deletions(-) diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py index 67011f53..66cb9c80 100644 --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -38,16 +38,16 @@ def get_execmodel(backend): return backend if backend == "thread": importdef = { - "get_ident": ["_thread::get_ident"], - "_start_new_thread": ["_thread::start_new_thread"], - "threading": ["threading"], - "queue": ["queue"], - "sleep": ["time::sleep"], - "subprocess": ["subprocess"], - "socket": ["socket"], - "_fdopen": ["os::fdopen"], - "_lock": ["threading"], - "_event": ["threading"], + "get_ident": "_thread::get_ident", + "_start_new_thread": "_thread::start_new_thread", + "threading": "threading", + "queue": "queue", + "sleep": "time::sleep", + "subprocess": "subprocess", + "socket": "socket", + "_fdopen": "os::fdopen", + "_lock": "threading", + "_event": "threading", } def exec_start(self, func, args=()): @@ -55,16 +55,16 @@ def exec_start(self, func, args=()): elif backend == "eventlet": importdef = { - "get_ident": ["eventlet.green.thread::get_ident"], - "_spawn_n": ["eventlet::spawn_n"], - "threading": ["eventlet.green.threading"], - "queue": ["eventlet.queue"], - "sleep": ["eventlet::sleep"], - "subprocess": ["eventlet.green.subprocess"], - "socket": ["eventlet.green.socket"], - "_fdopen": ["eventlet.green.os::fdopen"], - "_lock": ["eventlet.green.threading"], - "_event": ["eventlet.green.threading"], + "get_ident": "eventlet.green.thread::get_ident", + "_spawn_n": "eventlet::spawn_n", + "threading": "eventlet.green.threading", + "queue": "eventlet.queue", + "sleep": "eventlet::sleep", + "subprocess": "eventlet.green.subprocess", + "socket": "eventlet.green.socket", + "_fdopen": "eventlet.green.os::fdopen", + "_lock": "eventlet.green.threading", + "_event": "eventlet.green.threading", } def exec_start(self, func, args=()): @@ -72,17 +72,17 @@ def exec_start(self, func, args=()): elif backend == "gevent": importdef = { - "get_ident": ["gevent.thread::get_ident"], - "_spawn_n": ["gevent::spawn"], - "threading": ["threading"], - "queue": ["gevent.queue"], - "sleep": ["gevent::sleep"], - "subprocess": ["gevent.subprocess"], - "socket": ["gevent.socket"], + "get_ident": "gevent.thread::get_ident", + "_spawn_n": "gevent::spawn", + "threading": "threading", + "queue": "gevent.queue", + "sleep": "gevent::sleep", + "subprocess": "gevent.subprocess", + "socket": "gevent.socket", # XXX - "_fdopen": ["gevent.fileobject::FileObjectThread"], - "_lock": ["gevent.lock"], - "_event": ["gevent.event"], + "_fdopen": "gevent.fileobject::FileObjectThread", + "_lock": "gevent.lock", + "_event": "gevent.event", } def exec_start(self, func, args=()): @@ -101,21 +101,19 @@ def __repr__(self): return "" % self.backend def __getattr__(self, name): - locs = self._importdef.get(name) - if locs is None: + loc = self._importdef.get(name) + if loc is None: raise AttributeError(name) - for loc in locs: - parts = loc.split("::") - loc = parts.pop(0) - try: - mod = __import__(loc, None, None, "__doc__") - except ImportError: - pass - else: - if parts: - mod = getattr(mod, parts[0]) - setattr(self, name, mod) - return mod + parts = loc.split("::") + try: + mod = __import__(parts[0], None, None, "__doc__") + except ImportError: + pass + else: + if len(parts) > 1: + mod = getattr(mod, parts[1]) + setattr(self, name, mod) + return mod raise AttributeError(name) start = exec_start From e4f6090de02862fbb52291b16b3022df7ce3c433 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Fri, 21 Apr 2023 10:02:42 +0300 Subject: [PATCH 10/13] gateway_base: remove `threading` from exec model The gevent one is wrong (should be `gevent.threading`), which made it clear it's not used at all. --- execnet/gateway_base.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py index 66cb9c80..ed951503 100644 --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -40,7 +40,6 @@ def get_execmodel(backend): importdef = { "get_ident": "_thread::get_ident", "_start_new_thread": "_thread::start_new_thread", - "threading": "threading", "queue": "queue", "sleep": "time::sleep", "subprocess": "subprocess", @@ -57,7 +56,6 @@ def exec_start(self, func, args=()): importdef = { "get_ident": "eventlet.green.thread::get_ident", "_spawn_n": "eventlet::spawn_n", - "threading": "eventlet.green.threading", "queue": "eventlet.queue", "sleep": "eventlet::sleep", "subprocess": "eventlet.green.subprocess", @@ -74,7 +72,6 @@ def exec_start(self, func, args=()): importdef = { "get_ident": "gevent.thread::get_ident", "_spawn_n": "gevent::spawn", - "threading": "threading", "queue": "gevent.queue", "sleep": "gevent::sleep", "subprocess": "gevent.subprocess", From 6c56e88d48a34396e6a4aa0e96bc810ab07c8b8e Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Fri, 21 Apr 2023 10:08:34 +0300 Subject: [PATCH 11/13] gateway_base: remove unused ExecModel `_count` attribute --- execnet/gateway_base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py index ed951503..d6fc9e28 100644 --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -92,7 +92,6 @@ class ExecModel: def __init__(self, name): self._importdef = importdef self.backend = name - self._count = 0 def __repr__(self): return "" % self.backend From c2b9e60afdb5975b27ad9648a6d05ab9d6e41859 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Fri, 21 Apr 2023 00:28:17 +0300 Subject: [PATCH 12/13] gateway_base: remove execmodel.{WorkerPool,PopenPiped} `execmodel.WorkerPool` creates an unneeded dependency cycle. And it's not any better than the alternative really. `execmodel.PopenPiped` is a convenience that's not really needed at the abstraction layer, clearer to just inline it to the one place it's used. --- execnet/gateway_base.py | 11 ++--------- execnet/gateway_io.py | 3 ++- execnet/multi.py | 3 ++- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py index d6fc9e28..c50e41b7 100644 --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -117,9 +117,6 @@ def __getattr__(self, name): def fdopen(self, fd, mode, bufsize=1): return self._fdopen(fd, mode, bufsize) - def WorkerPool(self, hasprimary=False): - return WorkerPool(self, hasprimary=hasprimary) - def Semaphore(self, size=None): if size is None: return EmptySemaphore() @@ -134,10 +131,6 @@ def RLock(self): def Event(self): return self._event.Event() - def PopenPiped(self, args): - PIPE = self.subprocess.PIPE - return self.subprocess.Popen(args, stdout=PIPE, stdin=PIPE) - return ExecModel(backend) @@ -909,7 +902,7 @@ def __init__(self, io, id, _startcount=2): # globals may be NONE at process-termination self.__trace = trace self._geterrortext = geterrortext - self._receivepool = self.execmodel.WorkerPool() + self._receivepool = WorkerPool(self.execmodel) def _trace(self, *msg): self.__trace(self.id, *msg) @@ -1011,7 +1004,7 @@ def trace(msg): self._trace("[serve] " + msg) hasprimary = self.execmodel.backend == "thread" - self._execpool = self.execmodel.WorkerPool(hasprimary=hasprimary) + self._execpool = WorkerPool(self.execmodel, hasprimary=hasprimary) trace("spawning receiver thread") self._initreceive() try: diff --git a/execnet/gateway_io.py b/execnet/gateway_io.py index ea0fc92d..c631f8d9 100644 --- a/execnet/gateway_io.py +++ b/execnet/gateway_io.py @@ -17,7 +17,8 @@ class Popen2IOMaster(Popen2IO): def __init__(self, args, execmodel): - self.popen = p = execmodel.PopenPiped(args) + PIPE = execmodel.subprocess.PIPE + self.popen = p = execmodel.subprocess.Popen(args, stdout=PIPE, stdin=PIPE) super().__init__(p.stdin, p.stdout, execmodel=execmodel) def wait(self): diff --git a/execnet/multi.py b/execnet/multi.py index 28489dbd..64e95017 100644 --- a/execnet/multi.py +++ b/execnet/multi.py @@ -12,6 +12,7 @@ from . import gateway_io from .gateway_base import get_execmodel from .gateway_base import trace +from .gateway_base import WorkerPool from .xspec import XSpec NO_ENDMARKER_WANTED = object() @@ -292,7 +293,7 @@ def waitclose(self): def safe_terminate(execmodel, timeout, list_of_paired_functions): - workerpool = execmodel.WorkerPool() + workerpool = WorkerPool(execmodel) def termkill(termfunc, killfunc): termreply = workerpool.spawn(termfunc) From fdf6b1e23aa14b14ca08119bfcbade5d87cb0846 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Fri, 21 Apr 2023 10:53:04 +0300 Subject: [PATCH 13/13] gateway_base: remove Semaphore from execmodel Not used since 120f63962390fb0c43278da5023b77b82d37b335. --- execnet/gateway_base.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/execnet/gateway_base.py b/execnet/gateway_base.py index c50e41b7..49f7e4a9 100644 --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -29,10 +29,6 @@ # f.write(" ".join([str(x) for x in msg]) + "\n") -class EmptySemaphore: - acquire = release = lambda self: None - - def get_execmodel(backend): if hasattr(backend, "backend"): return backend @@ -117,11 +113,6 @@ def __getattr__(self, name): def fdopen(self, fd, mode, bufsize=1): return self._fdopen(fd, mode, bufsize) - def Semaphore(self, size=None): - if size is None: - return EmptySemaphore() - return self._lock.Semaphore(size) - def Lock(self): return self._lock.RLock()