From 96c3d2a0703ffba5e3b6fbcef7b7e19387d4770a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Wed, 5 Jul 2023 07:42:06 +0000 Subject: [PATCH 01/20] run pending in parallel --- src/pyk/proof/reachability.py | 63 +++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/src/pyk/proof/reachability.py b/src/pyk/proof/reachability.py index 3b7a78317..6c0794445 100644 --- a/src/pyk/proof/reachability.py +++ b/src/pyk/proof/reachability.py @@ -5,6 +5,7 @@ from dataclasses import dataclass from itertools import chain from typing import TYPE_CHECKING +from multiprocessing import Pool from pyk.kore.rpc import LogEntry @@ -477,6 +478,7 @@ def advance_proof( cut_point_rules: Iterable[str] = (), terminal_rules: Iterable[str] = (), implication_every_block: bool = True, + max_workers: int = 1, ) -> KCFG: iterations = 0 @@ -487,38 +489,41 @@ def advance_proof( _LOGGER.warning(f'Reached iteration bound {self.proof.id}: {max_iterations}') break iterations += 1 - curr_node = self.proof.pending[0] - if self._check_subsume(curr_node): - continue - - if self._check_terminal(curr_node): - continue - - if self._check_abstract(curr_node): - continue + def _advance_from_node(nid: NodeIdLike) -> None: + _LOGGER.debug(f'advancing on node {nid}') + node = self.proof.kcfg.node(nid) + if self._check_subsume(node): + return + + if self._check_terminal(node): + return + + if self._check_abstract(node): + return + + if self._extract_branches is not None and len(self.proof.kcfg.splits(target_id=nid)) == 0: + branches = list(self._extract_branches(node.cterm)) + if len(branches) > 0: + self.proof.kcfg.split_on_constraints(nid, branches) + _LOGGER.info( + f'Found {len(branches)} branches using heuristic for node {self.proof.id}: {shorten_hashes(nid)}: {[self.kcfg_explore.kprint.pretty_print(bc) for bc in branches]}' + ) + return + + self.kcfg_explore.extend( + self.proof.kcfg, + node, + self.proof.logs, + execute_depth=execute_depth, + cut_point_rules=cut_point_rules, + terminal_rules=terminal_rules, + ) - if self._extract_branches is not None and len(self.proof.kcfg.splits(target_id=curr_node.id)) == 0: - branches = list(self._extract_branches(curr_node.cterm)) - if len(branches) > 0: - self.proof.kcfg.split_on_constraints(curr_node.id, branches) - _LOGGER.info( - f'Found {len(branches)} branches using heuristic for node {self.proof.id}: {shorten_hashes(curr_node.id)}: {[self.kcfg_explore.kprint.pretty_print(bc) for bc in branches]}' - ) - continue + curr_nodes = [node.id for ni, node in enumerate(self.proof.pending) if ni < max_workers] + pool = Pool(processes=max_workers) + pool.map(_advance_from_node, curr_nodes) - module_name = ( - self.circularities_module_name if self.nonzero_depth(curr_node) else self.dependencies_module_name - ) - self.kcfg_explore.extend( - self.proof.kcfg, - curr_node, - self.proof.logs, - execute_depth=execute_depth, - cut_point_rules=cut_point_rules, - terminal_rules=terminal_rules, - module_name=module_name, - ) self.proof.write_proof() return self.proof.kcfg From 5c9cb5a8b5e2d148efa6b01b4a0bbce450b9388f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Wed, 5 Jul 2023 10:42:44 +0000 Subject: [PATCH 02/20] parallel advance --- src/pyk/proof/reachability.py | 63 ++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/src/pyk/proof/reachability.py b/src/pyk/proof/reachability.py index 6c0794445..2d233190a 100644 --- a/src/pyk/proof/reachability.py +++ b/src/pyk/proof/reachability.py @@ -5,7 +5,7 @@ from dataclasses import dataclass from itertools import chain from typing import TYPE_CHECKING -from multiprocessing import Pool +from multiprocessing.pool import ThreadPool as Pool from pyk.kore.rpc import LogEntry @@ -482,6 +482,37 @@ def advance_proof( ) -> KCFG: iterations = 0 + def _advance_from_node(nid: NodeIdLike) -> None: + _LOGGER.info(f'advancing on node {nid}') + node = self.proof.kcfg.node(nid) + if self._check_subsume(node): + return + + if self._check_terminal(node): + return + + if self._check_abstract(node): + return + + if self._extract_branches is not None and len(self.proof.kcfg.splits(target_id=nid)) == 0: + branches = list(self._extract_branches(node.cterm)) + if len(branches) > 0: + self.proof.kcfg.split_on_constraints(nid, branches) + _LOGGER.info( + f'Found {len(branches)} branches using heuristic for node {self.proof.id}: {shorten_hashes(nid)}: {[self.kcfg_explore.kprint.pretty_print(bc) for bc in branches]}' + ) + return + + self.kcfg_explore.extend( + self.proof.kcfg, + node, + self.proof.logs, + execute_depth=execute_depth, + cut_point_rules=cut_point_rules, + terminal_rules=terminal_rules, + ) + + while self.proof.pending: self.proof.write_proof() @@ -490,36 +521,6 @@ def advance_proof( break iterations += 1 - def _advance_from_node(nid: NodeIdLike) -> None: - _LOGGER.debug(f'advancing on node {nid}') - node = self.proof.kcfg.node(nid) - if self._check_subsume(node): - return - - if self._check_terminal(node): - return - - if self._check_abstract(node): - return - - if self._extract_branches is not None and len(self.proof.kcfg.splits(target_id=nid)) == 0: - branches = list(self._extract_branches(node.cterm)) - if len(branches) > 0: - self.proof.kcfg.split_on_constraints(nid, branches) - _LOGGER.info( - f'Found {len(branches)} branches using heuristic for node {self.proof.id}: {shorten_hashes(nid)}: {[self.kcfg_explore.kprint.pretty_print(bc) for bc in branches]}' - ) - return - - self.kcfg_explore.extend( - self.proof.kcfg, - node, - self.proof.logs, - execute_depth=execute_depth, - cut_point_rules=cut_point_rules, - terminal_rules=terminal_rules, - ) - curr_nodes = [node.id for ni, node in enumerate(self.proof.pending) if ni < max_workers] pool = Pool(processes=max_workers) pool.map(_advance_from_node, curr_nodes) From ab1199d99e639c44bcdbe5e27daf0148eb0e4fa5 Mon Sep 17 00:00:00 2001 From: franfran Date: Wed, 5 Jul 2023 14:10:46 +0300 Subject: [PATCH 03/20] add multi-rpc support --- src/pyk/kcfg/explore.py | 14 +++++++------- src/pyk/kore/rpc.py | 8 +++++++- src/pyk/proof/reachability.py | 5 ++--- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/pyk/kcfg/explore.py b/src/pyk/kcfg/explore.py index 7ff742763..be458e5b0 100644 --- a/src/pyk/kcfg/explore.py +++ b/src/pyk/kcfg/explore.py @@ -55,7 +55,7 @@ class KCFGExplore(ContextManager['KCFGExplore']): _bug_report: BugReport | None _kore_server: KoreServer | None - _kore_client: KoreClient | None + _kore_clients: list[KoreClient] _rpc_closed: bool _trace_rewrites: bool @@ -112,18 +112,18 @@ def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: haskell_log_entries=self._haskell_log_entries, log_axioms_file=self._log_axioms_file, ) - if not self._kore_client: - self._kore_client = KoreClient('localhost', self._kore_server._port, bug_report=self._bug_report) - return (self._kore_server, self._kore_client) + if not self._kore_clients or not any(not client._busy for client in self._kore_clients): + self._kore_clients.append(KoreClient('localhost', self._kore_server._port, bug_report=self._bug_report)) + return (self._kore_server, next(client for client in self._kore_clients if not client._busy)) def close(self) -> None: self._rpc_closed = True if self._kore_server is not None: self._kore_server.close() self._kore_server = None - if self._kore_client is not None: - self._kore_client.close() - self._kore_client = None + while self._kore_clients: + client = self._kore_clients.pop() + client.close() def cterm_execute( self, diff --git a/src/pyk/kore/rpc.py b/src/pyk/kore/rpc.py index 55b3e29af..b247e6df4 100644 --- a/src/pyk/kore/rpc.py +++ b/src/pyk/kore/rpc.py @@ -527,6 +527,8 @@ class KoreClient(ContextManager['KoreClient']): _client: JsonRpcClient + _busy: bool = False + def __init__(self, host: str, port: int, *, timeout: int | None = None, bug_report: BugReport | None = None): self._client = JsonRpcClient(host, port, timeout=timeout, bug_report=bug_report) @@ -540,9 +542,13 @@ def close(self) -> None: self._client.close() def _request(self, method: str, **params: Any) -> dict[str, Any]: + self._busy = True try: - return self._client.request(method, **params) + res = self._client.request(method, **params) + self._busy = False + return res except JsonRpcError as err: + self._busy = False assert err.code not in {-32601, -32602}, 'Malformed Kore-RPC request' raise KoreClientError(message=err.message, code=err.code, data=err.data) from err diff --git a/src/pyk/proof/reachability.py b/src/pyk/proof/reachability.py index 2d233190a..9026add4e 100644 --- a/src/pyk/proof/reachability.py +++ b/src/pyk/proof/reachability.py @@ -4,8 +4,8 @@ import logging from dataclasses import dataclass from itertools import chain -from typing import TYPE_CHECKING from multiprocessing.pool import ThreadPool as Pool +from typing import TYPE_CHECKING from pyk.kore.rpc import LogEntry @@ -512,7 +512,6 @@ def _advance_from_node(nid: NodeIdLike) -> None: terminal_rules=terminal_rules, ) - while self.proof.pending: self.proof.write_proof() @@ -525,7 +524,6 @@ def _advance_from_node(nid: NodeIdLike) -> None: pool = Pool(processes=max_workers) pool.map(_advance_from_node, curr_nodes) - self.proof.write_proof() return self.proof.kcfg @@ -687,6 +685,7 @@ def advance_proof( cut_point_rules: Iterable[str] = (), terminal_rules: Iterable[str] = (), implication_every_block: bool = True, + max_workers: int = 1, ) -> KCFG: iterations = 0 From a41d69f6d758dc6d465ff41ddc26ad23a14f4a52 Mon Sep 17 00:00:00 2001 From: franfran Date: Wed, 5 Jul 2023 14:15:52 +0300 Subject: [PATCH 04/20] list init --- src/pyk/kcfg/explore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pyk/kcfg/explore.py b/src/pyk/kcfg/explore.py index be458e5b0..ba719e774 100644 --- a/src/pyk/kcfg/explore.py +++ b/src/pyk/kcfg/explore.py @@ -55,7 +55,7 @@ class KCFGExplore(ContextManager['KCFGExplore']): _bug_report: BugReport | None _kore_server: KoreServer | None - _kore_clients: list[KoreClient] + _kore_clients: list[KoreClient] = [] _rpc_closed: bool _trace_rewrites: bool From 48ac0a48b2848ecd515cf985727d0d96ab19bf44 Mon Sep 17 00:00:00 2001 From: devops Date: Wed, 5 Jul 2023 11:27:26 +0000 Subject: [PATCH 05/20] Set Version: 0.1.359 --- package/version | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package/version b/package/version index 068ad42c2..8509341ee 100644 --- a/package/version +++ b/package/version @@ -1 +1 @@ -0.1.358 +0.1.359 diff --git a/pyproject.toml b/pyproject.toml index 17d67c2ce..096598aae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pyk" -version = "0.1.358" +version = "0.1.359" description = "" authors = [ "Runtime Verification, Inc. ", From 616044d28117e1d6f52d7c0f9c2328d700efe3f6 Mon Sep 17 00:00:00 2001 From: franfran Date: Wed, 5 Jul 2023 19:44:33 +0300 Subject: [PATCH 06/20] manage server locks --- src/pyk/kcfg/explore.py | 58 +++++++++++++++++++++++++++++------------ src/pyk/kore/rpc.py | 13 ++++++--- 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/src/pyk/kcfg/explore.py b/src/pyk/kcfg/explore.py index ba719e774..7fdb219ef 100644 --- a/src/pyk/kcfg/explore.py +++ b/src/pyk/kcfg/explore.py @@ -55,9 +55,11 @@ class KCFGExplore(ContextManager['KCFGExplore']): _bug_report: BugReport | None _kore_server: KoreServer | None + _kore_servers: list[KoreServer] = [] _kore_clients: list[KoreClient] = [] _rpc_closed: bool _trace_rewrites: bool + _max_clients: int def __init__( self, @@ -73,6 +75,7 @@ def __init__( haskell_log_entries: Iterable[str] = (), log_axioms_file: Path | None = None, trace_rewrites: bool = False, + _max_clients: int = 1, ): self.kprint = kprint self.id = id if id is not None else 'NO ID' @@ -88,6 +91,7 @@ def __init__( self._kore_client = None self._rpc_closed = False self._trace_rewrites = trace_rewrites + self._max_clients = _max_clients def __enter__(self) -> KCFGExplore: return self @@ -99,22 +103,44 @@ def __exit__(self, *args: Any) -> None: def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: if self._rpc_closed: raise ValueError('RPC server already closed!') - if not self._kore_server: - self._kore_server = KoreServer( - self.kprint.definition_dir, - self.kprint.main_module, - port=self._port, - bug_report=self._bug_report, - command=self._kore_rpc_command, - smt_timeout=self._smt_timeout, - smt_retry_limit=self._smt_retry_limit, - haskell_log_format=self._haskell_log_format, - haskell_log_entries=self._haskell_log_entries, - log_axioms_file=self._log_axioms_file, - ) - if not self._kore_clients or not any(not client._busy for client in self._kore_clients): - self._kore_clients.append(KoreClient('localhost', self._kore_server._port, bug_report=self._bug_report)) - return (self._kore_server, next(client for client in self._kore_clients if not client._busy)) + if self._rpc_busy: + (server, client) = self._next_available_server() + elif not self._kore_servers or all(client._busy for client in self._kore_clients): + (server, client) = self._new_server() + self._kore_servers.append(server) + self._kore_clients.append(client) + else: + (i, client) = next((i, client) for i, client in enumerate(self._kore_clients) if not client._busy) + server = self._kore_servers[i] + return (server, client) + + @property + def _rpc_busy(self) -> bool: + return len(self._kore_clients) >= self._max_clients and all(client._busy for client in self._kore_clients) + + def _next_available_server(self) -> tuple[KoreServer, KoreClient]: + i = 0 + while True: + + + i = i % self._max_clients + 1 + + # TODO: don't use the same defined port for the KoreServer but a new one + def _new_server(self) -> tuple[KoreServer, KoreClient]: + server = KoreServer( + self.kprint.definition_dir, + self.kprint.main_module, + port=self._port, + bug_report=self._bug_report, + command=self._kore_rpc_command, + smt_timeout=self._smt_timeout, + smt_retry_limit=self._smt_retry_limit, + haskell_log_format=self._haskell_log_format, + haskell_log_entries=self._haskell_log_entries, + log_axioms_file=self._log_axioms_file, + ) + client = KoreClient('localhost', server._port, bug_report=self._bug_report) + return(server, client) def close(self) -> None: self._rpc_closed = True diff --git a/src/pyk/kore/rpc.py b/src/pyk/kore/rpc.py index b247e6df4..ad8d0724f 100644 --- a/src/pyk/kore/rpc.py +++ b/src/pyk/kore/rpc.py @@ -10,6 +10,7 @@ from pathlib import Path from signal import SIGINT from subprocess import Popen +import threading from time import sleep from typing import TYPE_CHECKING, ContextManager, final @@ -527,10 +528,11 @@ class KoreClient(ContextManager['KoreClient']): _client: JsonRpcClient - _busy: bool = False + _lock: threading.Lock def __init__(self, host: str, port: int, *, timeout: int | None = None, bug_report: BugReport | None = None): self._client = JsonRpcClient(host, port, timeout=timeout, bug_report=bug_report) + self._lock = threading.Lock() def __enter__(self) -> KoreClient: return self @@ -541,14 +543,17 @@ def __exit__(self, *args: Any) -> None: def close(self) -> None: self._client.close() + def _busy(self) -> bool: + return self._lock.locked() + def _request(self, method: str, **params: Any) -> dict[str, Any]: - self._busy = True + self._lock.acquire() try: res = self._client.request(method, **params) - self._busy = False + self._lock.release() return res except JsonRpcError as err: - self._busy = False + self._lock.release() assert err.code not in {-32601, -32602}, 'Malformed Kore-RPC request' raise KoreClientError(message=err.message, code=err.code, data=err.data) from err From 8770b938a3969dbdc1017b951947c1d51682efb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Wed, 5 Jul 2023 17:51:29 +0000 Subject: [PATCH 07/20] acquire lock --- src/pyk/kcfg/explore.py | 7 +++++-- src/pyk/kore/rpc.py | 8 +++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/pyk/kcfg/explore.py b/src/pyk/kcfg/explore.py index 7fdb219ef..03b473de1 100644 --- a/src/pyk/kcfg/explore.py +++ b/src/pyk/kcfg/explore.py @@ -109,8 +109,10 @@ def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: (server, client) = self._new_server() self._kore_servers.append(server) self._kore_clients.append(client) + client._lock.acquire() else: (i, client) = next((i, client) for i, client in enumerate(self._kore_clients) if not client._busy) + client._lock.acquire() server = self._kore_servers[i] return (server, client) @@ -121,9 +123,10 @@ def _rpc_busy(self) -> bool: def _next_available_server(self) -> tuple[KoreServer, KoreClient]: i = 0 while True: + if self._kore_clients[i]._lock.acquire(timeout=5): + return (self._kore_servers[i], self._kore_clients[i]) - - i = i % self._max_clients + 1 + i = (i + 1) % len(self._kore_clients) # TODO: don't use the same defined port for the KoreServer but a new one def _new_server(self) -> tuple[KoreServer, KoreClient]: diff --git a/src/pyk/kore/rpc.py b/src/pyk/kore/rpc.py index ad8d0724f..092b41ef6 100644 --- a/src/pyk/kore/rpc.py +++ b/src/pyk/kore/rpc.py @@ -547,13 +547,10 @@ def _busy(self) -> bool: return self._lock.locked() def _request(self, method: str, **params: Any) -> dict[str, Any]: - self._lock.acquire() try: res = self._client.request(method, **params) - self._lock.release() return res except JsonRpcError as err: - self._lock.release() assert err.code not in {-32601, -32602}, 'Malformed Kore-RPC request' raise KoreClientError(message=err.message, code=err.code, data=err.data) from err @@ -593,6 +590,7 @@ def execute( ) result = self._request('execute', **params) + self._lock.release() return ExecuteResult.from_dict(result) def implies( @@ -612,6 +610,7 @@ def implies( ) result = self._request('implies', **params) + self._lock.release() return ImpliesResult.from_dict(result) def simplify( @@ -629,6 +628,7 @@ def simplify( ) result = self._request('simplify', **params) + self._lock.release() logs = tuple(LogEntry.from_dict(l) for l in result['logs']) if 'logs' in result else () return kore_term(result['state'], Pattern), logs # type: ignore # https://github.com/python/mypy/issues/4717 @@ -641,10 +641,12 @@ def get_model(self, pattern: Pattern, module_name: str | None = None) -> GetMode ) result = self._request('get-model', **params) + self._lock.release() return GetModelResult.from_dict(result) def add_module(self, module: Module) -> None: result = self._request('add-module', module=module.text) + self._lock.release() assert result == [] From a00b4a420776d97b3d12bc0d051125c218b7e417 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Wed, 5 Jul 2023 18:05:19 +0000 Subject: [PATCH 08/20] truth boolean check --- src/pyk/kcfg/explore.py | 6 +++--- src/pyk/kore/rpc.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pyk/kcfg/explore.py b/src/pyk/kcfg/explore.py index 03b473de1..f74a001bd 100644 --- a/src/pyk/kcfg/explore.py +++ b/src/pyk/kcfg/explore.py @@ -111,7 +111,7 @@ def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: self._kore_clients.append(client) client._lock.acquire() else: - (i, client) = next((i, client) for i, client in enumerate(self._kore_clients) if not client._busy) + (i, client) = next((i, client) for i, client in enumerate(self._kore_clients) if client._busy is False) client._lock.acquire() server = self._kore_servers[i] return (server, client) @@ -126,7 +126,7 @@ def _next_available_server(self) -> tuple[KoreServer, KoreClient]: if self._kore_clients[i]._lock.acquire(timeout=5): return (self._kore_servers[i], self._kore_clients[i]) - i = (i + 1) % len(self._kore_clients) + i = (i + 1) % len(self._kore_clients) # TODO: don't use the same defined port for the KoreServer but a new one def _new_server(self) -> tuple[KoreServer, KoreClient]: @@ -143,7 +143,7 @@ def _new_server(self) -> tuple[KoreServer, KoreClient]: log_axioms_file=self._log_axioms_file, ) client = KoreClient('localhost', server._port, bug_report=self._bug_report) - return(server, client) + return (server, client) def close(self) -> None: self._rpc_closed = True diff --git a/src/pyk/kore/rpc.py b/src/pyk/kore/rpc.py index 092b41ef6..8ef625a15 100644 --- a/src/pyk/kore/rpc.py +++ b/src/pyk/kore/rpc.py @@ -3,6 +3,7 @@ import json import logging import socket +import threading from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime, timedelta @@ -10,7 +11,6 @@ from pathlib import Path from signal import SIGINT from subprocess import Popen -import threading from time import sleep from typing import TYPE_CHECKING, ContextManager, final From 4565a42cf5821dd6bab4071ba348bc9b81a594a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Wed, 5 Jul 2023 19:41:54 +0000 Subject: [PATCH 09/20] acquire new --- src/pyk/kcfg/explore.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/pyk/kcfg/explore.py b/src/pyk/kcfg/explore.py index f74a001bd..3189889f4 100644 --- a/src/pyk/kcfg/explore.py +++ b/src/pyk/kcfg/explore.py @@ -103,29 +103,35 @@ def __exit__(self, *args: Any) -> None: def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: if self._rpc_closed: raise ValueError('RPC server already closed!') - if self._rpc_busy: - (server, client) = self._next_available_server() - elif not self._kore_servers or all(client._busy for client in self._kore_clients): + curr_server = self._curr_server # need interim because of lock + if self._kore_clients and curr_server is not None: + (server, client) = curr_server + elif len(self._kore_clients) < self._max_clients: (server, client) = self._new_server() + client._lock.acquire(blocking=True) self._kore_servers.append(server) self._kore_clients.append(client) - client._lock.acquire() else: - (i, client) = next((i, client) for i, client in enumerate(self._kore_clients) if client._busy is False) - client._lock.acquire() - server = self._kore_servers[i] + (server, client) = self._next_available_server() + print(client._client._port) return (server, client) @property def _rpc_busy(self) -> bool: return len(self._kore_clients) >= self._max_clients and all(client._busy for client in self._kore_clients) + @property + def _curr_server(self) -> tuple[KoreServer, KoreClient] | None: + i, client = next(((i, client) for i, client in enumerate(self._kore_clients) if self._kore_clients[i]._lock.acquire(blocking=False)), (0, None)) + if client is not None: + return (self._kore_servers[i], self._kore_clients[i]) + return None + def _next_available_server(self) -> tuple[KoreServer, KoreClient]: i = 0 while True: if self._kore_clients[i]._lock.acquire(timeout=5): return (self._kore_servers[i], self._kore_clients[i]) - i = (i + 1) % len(self._kore_clients) # TODO: don't use the same defined port for the KoreServer but a new one From 526710d8c6712632b5178699802121f436257fa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Wed, 5 Jul 2023 19:44:23 +0000 Subject: [PATCH 10/20] fmt --- src/pyk/kcfg/explore.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/pyk/kcfg/explore.py b/src/pyk/kcfg/explore.py index 3189889f4..53acaa4e6 100644 --- a/src/pyk/kcfg/explore.py +++ b/src/pyk/kcfg/explore.py @@ -103,7 +103,7 @@ def __exit__(self, *args: Any) -> None: def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: if self._rpc_closed: raise ValueError('RPC server already closed!') - curr_server = self._curr_server # need interim because of lock + curr_server = self._curr_server # need interim because of lock if self._kore_clients and curr_server is not None: (server, client) = curr_server elif len(self._kore_clients) < self._max_clients: @@ -122,7 +122,14 @@ def _rpc_busy(self) -> bool: @property def _curr_server(self) -> tuple[KoreServer, KoreClient] | None: - i, client = next(((i, client) for i, client in enumerate(self._kore_clients) if self._kore_clients[i]._lock.acquire(blocking=False)), (0, None)) + i, client = next( + ( + (i, client) + for i, client in enumerate(self._kore_clients) + if self._kore_clients[i]._lock.acquire(blocking=False) + ), + (0, None), + ) if client is not None: return (self._kore_servers[i], self._kore_clients[i]) return None From 1930c525466c78e6492172a867e9ed9a8c45b26b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Thu, 6 Jul 2023 07:18:19 +0000 Subject: [PATCH 11/20] remove _busy --- src/pyk/kcfg/explore.py | 8 +------- src/pyk/kore/rpc.py | 3 --- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/src/pyk/kcfg/explore.py b/src/pyk/kcfg/explore.py index 53acaa4e6..806374719 100644 --- a/src/pyk/kcfg/explore.py +++ b/src/pyk/kcfg/explore.py @@ -103,7 +103,7 @@ def __exit__(self, *args: Any) -> None: def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: if self._rpc_closed: raise ValueError('RPC server already closed!') - curr_server = self._curr_server # need interim because of lock + curr_server = self._curr_server() # need interim because of lock if self._kore_clients and curr_server is not None: (server, client) = curr_server elif len(self._kore_clients) < self._max_clients: @@ -113,14 +113,8 @@ def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: self._kore_clients.append(client) else: (server, client) = self._next_available_server() - print(client._client._port) return (server, client) - @property - def _rpc_busy(self) -> bool: - return len(self._kore_clients) >= self._max_clients and all(client._busy for client in self._kore_clients) - - @property def _curr_server(self) -> tuple[KoreServer, KoreClient] | None: i, client = next( ( diff --git a/src/pyk/kore/rpc.py b/src/pyk/kore/rpc.py index 8ef625a15..c2d3b53c5 100644 --- a/src/pyk/kore/rpc.py +++ b/src/pyk/kore/rpc.py @@ -543,9 +543,6 @@ def __exit__(self, *args: Any) -> None: def close(self) -> None: self._client.close() - def _busy(self) -> bool: - return self._lock.locked() - def _request(self, method: str, **params: Any) -> dict[str, Any]: try: res = self._client.request(method, **params) From d8b1b464b215f2c12e2b883c7e85ff23a4cf0ebc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Thu, 6 Jul 2023 07:40:36 +0000 Subject: [PATCH 12/20] free memory after use --- src/pyk/kcfg/explore.py | 13 +++++-------- src/pyk/proof/reachability.py | 2 +- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/pyk/kcfg/explore.py b/src/pyk/kcfg/explore.py index 806374719..a5d6aa226 100644 --- a/src/pyk/kcfg/explore.py +++ b/src/pyk/kcfg/explore.py @@ -54,7 +54,6 @@ class KCFGExplore(ContextManager['KCFGExplore']): _smt_retry_limit: int | None _bug_report: BugReport | None - _kore_server: KoreServer | None _kore_servers: list[KoreServer] = [] _kore_clients: list[KoreClient] = [] _rpc_closed: bool @@ -75,7 +74,7 @@ def __init__( haskell_log_entries: Iterable[str] = (), log_axioms_file: Path | None = None, trace_rewrites: bool = False, - _max_clients: int = 1, + max_clients: int = 1, ): self.kprint = kprint self.id = id if id is not None else 'NO ID' @@ -87,11 +86,9 @@ def __init__( self._haskell_log_format = haskell_log_format self._haskell_log_entries = haskell_log_entries self._log_axioms_file = log_axioms_file - self._kore_server = None - self._kore_client = None self._rpc_closed = False self._trace_rewrites = trace_rewrites - self._max_clients = _max_clients + self._max_clients = max_clients def __enter__(self) -> KCFGExplore: return self @@ -154,9 +151,9 @@ def _new_server(self) -> tuple[KoreServer, KoreClient]: def close(self) -> None: self._rpc_closed = True - if self._kore_server is not None: - self._kore_server.close() - self._kore_server = None + while self._kore_servers: + server = self._kore_servers.pop() + server.close() while self._kore_clients: client = self._kore_clients.pop() client.close() diff --git a/src/pyk/proof/reachability.py b/src/pyk/proof/reachability.py index 9026add4e..28ad9a194 100644 --- a/src/pyk/proof/reachability.py +++ b/src/pyk/proof/reachability.py @@ -483,7 +483,6 @@ def advance_proof( iterations = 0 def _advance_from_node(nid: NodeIdLike) -> None: - _LOGGER.info(f'advancing on node {nid}') node = self.proof.kcfg.node(nid) if self._check_subsume(node): return @@ -723,6 +722,7 @@ def advance_proof( cut_point_rules=cut_point_rules, terminal_rules=terminal_rules, implication_every_block=implication_every_block, + max_workers=max_workers ) self.proof.write_proof() From 79ae610b1ea1c1e6e44e1ef68c27f55c79580575 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Thu, 6 Jul 2023 08:06:40 +0000 Subject: [PATCH 13/20] optionally release --- src/pyk/kore/rpc.py | 13 ++++++++----- src/pyk/proof/reachability.py | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/pyk/kore/rpc.py b/src/pyk/kore/rpc.py index c2d3b53c5..4b8e97acd 100644 --- a/src/pyk/kore/rpc.py +++ b/src/pyk/kore/rpc.py @@ -543,11 +543,19 @@ def __exit__(self, *args: Any) -> None: def close(self) -> None: self._client.close() + def _try_release(self) -> None: + try: + self._lock.release() + except RuntimeError: + pass + def _request(self, method: str, **params: Any) -> dict[str, Any]: try: res = self._client.request(method, **params) + self._try_release() return res except JsonRpcError as err: + self._try_release() assert err.code not in {-32601, -32602}, 'Malformed Kore-RPC request' raise KoreClientError(message=err.message, code=err.code, data=err.data) from err @@ -587,7 +595,6 @@ def execute( ) result = self._request('execute', **params) - self._lock.release() return ExecuteResult.from_dict(result) def implies( @@ -607,7 +614,6 @@ def implies( ) result = self._request('implies', **params) - self._lock.release() return ImpliesResult.from_dict(result) def simplify( @@ -625,7 +631,6 @@ def simplify( ) result = self._request('simplify', **params) - self._lock.release() logs = tuple(LogEntry.from_dict(l) for l in result['logs']) if 'logs' in result else () return kore_term(result['state'], Pattern), logs # type: ignore # https://github.com/python/mypy/issues/4717 @@ -638,12 +643,10 @@ def get_model(self, pattern: Pattern, module_name: str | None = None) -> GetMode ) result = self._request('get-model', **params) - self._lock.release() return GetModelResult.from_dict(result) def add_module(self, module: Module) -> None: result = self._request('add-module', module=module.text) - self._lock.release() assert result == [] diff --git a/src/pyk/proof/reachability.py b/src/pyk/proof/reachability.py index 28ad9a194..e0a02b870 100644 --- a/src/pyk/proof/reachability.py +++ b/src/pyk/proof/reachability.py @@ -722,7 +722,7 @@ def advance_proof( cut_point_rules=cut_point_rules, terminal_rules=terminal_rules, implication_every_block=implication_every_block, - max_workers=max_workers + max_workers=max_workers, ) self.proof.write_proof() From ca8a44abf35d4d55c3972f956f24ca6a2aa0a96c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Thu, 6 Jul 2023 21:14:25 +0000 Subject: [PATCH 14/20] async dispatch --- src/pyk/kcfg/explore.py | 36 +++++++++++++++++++++++++++-------- src/pyk/kore/rpc.py | 1 + src/pyk/ktool/kompile.py | 11 ++++++++++- src/pyk/proof/reachability.py | 32 +++++++++++++++++-------------- 4 files changed, 57 insertions(+), 23 deletions(-) diff --git a/src/pyk/kcfg/explore.py b/src/pyk/kcfg/explore.py index a5d6aa226..307995596 100644 --- a/src/pyk/kcfg/explore.py +++ b/src/pyk/kcfg/explore.py @@ -101,17 +101,31 @@ def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: if self._rpc_closed: raise ValueError('RPC server already closed!') curr_server = self._curr_server() # need interim because of lock - if self._kore_clients and curr_server is not None: + if curr_server is not None: (server, client) = curr_server elif len(self._kore_clients) < self._max_clients: (server, client) = self._new_server() - client._lock.acquire(blocking=True) + client._lock.acquire() self._kore_servers.append(server) self._kore_clients.append(client) else: (server, client) = self._next_available_server() return (server, client) + @property + def _kore_rpcs(self) -> list[tuple[KoreServer, KoreClient]]: + if self._rpc_closed: + raise ValueError('RPC server already closed!') + res = self._all_servers() + new_servers = [] + while len(self._kore_servers) < self._max_clients: + (server, client) = self._new_server() + client._lock.acquire() + self._kore_servers.append(server) + self._kore_clients.append(client) + new_servers.append((server, client)) + return res + new_servers + def _curr_server(self) -> tuple[KoreServer, KoreClient] | None: i, client = next( ( @@ -132,6 +146,12 @@ def _next_available_server(self) -> tuple[KoreServer, KoreClient]: return (self._kore_servers[i], self._kore_clients[i]) i = (i + 1) % len(self._kore_clients) + def _all_servers(self) -> list[tuple[KoreServer, KoreClient]]: + acquired = [] + while len(acquired) < len(self._kore_servers): + acquired.append(self._next_available_server()) + return acquired + # TODO: don't use the same defined port for the KoreServer but a new one def _new_server(self) -> tuple[KoreServer, KoreClient]: server = KoreServer( @@ -532,9 +552,9 @@ def add_dependencies_module( for c in dependencies ] kore_axioms: list[Sentence] = [krule_to_kore(self.kprint.kompiled_kore, r) for r in kast_rules] - _, kore_client = self._kore_rpc - sentences: list[Sentence] = [Import(module_name=old_module_name, attrs=())] - sentences = sentences + kore_axioms - m = Module(name=new_module_name, sentences=sentences) - _LOGGER.info(f'Adding dependencies module {self.id}: {new_module_name}') - kore_client.add_module(m) + for _, kore_client in self._kore_rpcs: + sentences: list[Sentence] = [Import(module_name=old_module_name, attrs=())] + sentences = sentences + kore_axioms + m = Module(name=new_module_name, sentences=sentences) + _LOGGER.info(f'Adding dependencies module {self.id}: {new_module_name}') + kore_client.add_module(m) diff --git a/src/pyk/kore/rpc.py b/src/pyk/kore/rpc.py index 4b8e97acd..a56a9e3fb 100644 --- a/src/pyk/kore/rpc.py +++ b/src/pyk/kore/rpc.py @@ -95,6 +95,7 @@ def __exit__(self, *args: Any) -> None: def close(self) -> None: self._file.close() + self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() def request(self, method: str, **params: Any) -> dict[str, Any]: diff --git a/src/pyk/ktool/kompile.py b/src/pyk/ktool/kompile.py index b4601716c..c08429a33 100644 --- a/src/pyk/ktool/kompile.py +++ b/src/pyk/ktool/kompile.py @@ -29,6 +29,7 @@ def kompile( output_dir: str | Path | None = None, temp_dir: str | Path | None = None, debug: bool = False, + verbose: bool = False, cwd: Path | None = None, check: bool = True, **kwargs: Any, @@ -40,6 +41,7 @@ def kompile( output_dir=output_dir, temp_dir=temp_dir, debug=debug, + verbose=verbose, cwd=cwd, check=check, ) @@ -103,6 +105,7 @@ def __call__( output_dir: str | Path | None = None, temp_dir: str | Path | None = None, debug: bool = False, + verbose: bool = False, cwd: Path | None = None, check: bool = True, ) -> Path: @@ -124,8 +127,11 @@ def __call__( if debug: args += ['--debug'] + if verbose: + args += ['--verbose'] + try: - run_process(args, logger=_LOGGER, cwd=cwd, check=check) + proc_res = run_process(args, logger=_LOGGER, cwd=cwd, check=check) except CalledProcessError as err: raise RuntimeError( f'Command kompile exited with code {err.returncode} for: {self.base_args.main_file}', @@ -134,6 +140,9 @@ def __call__( err.returncode, ) from err + if proc_res.stdout: + print(proc_res.stdout.rstrip()) + definition_dir = output_dir if output_dir else Path(self.base_args.main_file.stem + '-kompiled') assert definition_dir.is_dir() return definition_dir diff --git a/src/pyk/proof/reachability.py b/src/pyk/proof/reachability.py index e0a02b870..2e2c0ab18 100644 --- a/src/pyk/proof/reachability.py +++ b/src/pyk/proof/reachability.py @@ -4,8 +4,8 @@ import logging from dataclasses import dataclass from itertools import chain -from multiprocessing.pool import ThreadPool as Pool from typing import TYPE_CHECKING +from multiprocessing.pool import ThreadPool as Pool from pyk.kore.rpc import LogEntry @@ -482,35 +482,40 @@ def advance_proof( ) -> KCFG: iterations = 0 - def _advance_from_node(nid: NodeIdLike) -> None: - node = self.proof.kcfg.node(nid) - if self._check_subsume(node): + def _advance_from_node(node: NodeIdLike) -> None: + curr_node = self.proof.kcfg.node(node) + if self._check_subsume(curr_node): return - if self._check_terminal(node): + if self._check_terminal(curr_node): return - if self._check_abstract(node): + if self._check_abstract(curr_node): return - if self._extract_branches is not None and len(self.proof.kcfg.splits(target_id=nid)) == 0: - branches = list(self._extract_branches(node.cterm)) + if self._extract_branches is not None and len(self.proof.kcfg.splits(target_id=curr_node.id)) == 0: + branches = list(self._extract_branches(curr_node.cterm)) if len(branches) > 0: - self.proof.kcfg.split_on_constraints(nid, branches) + self.proof.kcfg.split_on_constraints(curr_node.id, branches) _LOGGER.info( - f'Found {len(branches)} branches using heuristic for node {self.proof.id}: {shorten_hashes(nid)}: {[self.kcfg_explore.kprint.pretty_print(bc) for bc in branches]}' + f'Found {len(branches)} branches using heuristic for node {self.proof.id}: {shorten_hashes(curr_node.id)}: {[self.kcfg_explore.kprint.pretty_print(bc) for bc in branches]}' ) return + module_name = ( + self.circularities_module_name if self.nonzero_depth(curr_node) else self.dependencies_module_name + ) self.kcfg_explore.extend( self.proof.kcfg, - node, + curr_node, self.proof.logs, execute_depth=execute_depth, cut_point_rules=cut_point_rules, terminal_rules=terminal_rules, + module_name=module_name, ) + while self.proof.pending: self.proof.write_proof() @@ -521,7 +526,8 @@ def _advance_from_node(nid: NodeIdLike) -> None: curr_nodes = [node.id for ni, node in enumerate(self.proof.pending) if ni < max_workers] pool = Pool(processes=max_workers) - pool.map(_advance_from_node, curr_nodes) + res = pool.map_async(_advance_from_node, curr_nodes) + res.wait() self.proof.write_proof() return self.proof.kcfg @@ -684,7 +690,6 @@ def advance_proof( cut_point_rules: Iterable[str] = (), terminal_rules: Iterable[str] = (), implication_every_block: bool = True, - max_workers: int = 1, ) -> KCFG: iterations = 0 @@ -722,7 +727,6 @@ def advance_proof( cut_point_rules=cut_point_rules, terminal_rules=terminal_rules, implication_every_block=implication_every_block, - max_workers=max_workers, ) self.proof.write_proof() From 42a20985a3d0c690293659797f0089158974d3ce Mon Sep 17 00:00:00 2001 From: devops Date: Thu, 6 Jul 2023 21:14:50 +0000 Subject: [PATCH 15/20] Set Version: 0.1.360 --- package/version | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package/version b/package/version index 8509341ee..785120566 100644 --- a/package/version +++ b/package/version @@ -1 +1 @@ -0.1.359 +0.1.360 diff --git a/pyproject.toml b/pyproject.toml index 096598aae..0bcff45a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pyk" -version = "0.1.359" +version = "0.1.360" description = "" authors = [ "Runtime Verification, Inc. ", From 8c98c4d27770e8a667bea6f1a816c09b7712c392 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Thu, 6 Jul 2023 21:24:35 +0000 Subject: [PATCH 16/20] fmt --- src/pyk/kcfg/explore.py | 2 +- src/pyk/proof/reachability.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/pyk/kcfg/explore.py b/src/pyk/kcfg/explore.py index 307995596..4da49a90f 100644 --- a/src/pyk/kcfg/explore.py +++ b/src/pyk/kcfg/explore.py @@ -147,7 +147,7 @@ def _next_available_server(self) -> tuple[KoreServer, KoreClient]: i = (i + 1) % len(self._kore_clients) def _all_servers(self) -> list[tuple[KoreServer, KoreClient]]: - acquired = [] + acquired: list[tuple[KoreServer, KoreClient]] = [] while len(acquired) < len(self._kore_servers): acquired.append(self._next_available_server()) return acquired diff --git a/src/pyk/proof/reachability.py b/src/pyk/proof/reachability.py index 2e2c0ab18..aae116ab5 100644 --- a/src/pyk/proof/reachability.py +++ b/src/pyk/proof/reachability.py @@ -4,8 +4,8 @@ import logging from dataclasses import dataclass from itertools import chain -from typing import TYPE_CHECKING from multiprocessing.pool import ThreadPool as Pool +from typing import TYPE_CHECKING from pyk.kore.rpc import LogEntry @@ -515,7 +515,6 @@ def _advance_from_node(node: NodeIdLike) -> None: module_name=module_name, ) - while self.proof.pending: self.proof.write_proof() @@ -690,6 +689,7 @@ def advance_proof( cut_point_rules: Iterable[str] = (), terminal_rules: Iterable[str] = (), implication_every_block: bool = True, + max_workers: int = 1, ) -> KCFG: iterations = 0 @@ -727,6 +727,7 @@ def advance_proof( cut_point_rules=cut_point_rules, terminal_rules=terminal_rules, implication_every_block=implication_every_block, + max_workers=max_workers, ) self.proof.write_proof() From f1403f5574b6be470e88a51597fafef6fc93eaa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Mon, 10 Jul 2023 06:48:01 +0000 Subject: [PATCH 17/20] only pool necessary workers --- src/pyk/proof/reachability.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pyk/proof/reachability.py b/src/pyk/proof/reachability.py index aae116ab5..dbf3f4f81 100644 --- a/src/pyk/proof/reachability.py +++ b/src/pyk/proof/reachability.py @@ -524,7 +524,7 @@ def _advance_from_node(node: NodeIdLike) -> None: iterations += 1 curr_nodes = [node.id for ni, node in enumerate(self.proof.pending) if ni < max_workers] - pool = Pool(processes=max_workers) + pool = Pool(processes=len(curr_nodes)) res = pool.map_async(_advance_from_node, curr_nodes) res.wait() From b1a718faaea82f923aa557f916d72c78eec4bdab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Mon, 10 Jul 2023 09:46:13 +0000 Subject: [PATCH 18/20] abstract over KoreServerPool --- src/pyk/kcfg/explore.py | 128 +++--------------- src/pyk/kore/rpc.py | 246 ++++++++++++++++++++++++++++++++-- src/pyk/proof/proof.py | 1 + src/pyk/proof/reachability.py | 2 + src/pyk/testing/_kompiler.py | 5 +- 5 files changed, 257 insertions(+), 125 deletions(-) diff --git a/src/pyk/kcfg/explore.py b/src/pyk/kcfg/explore.py index 4da49a90f..abcc225ba 100644 --- a/src/pyk/kcfg/explore.py +++ b/src/pyk/kcfg/explore.py @@ -18,7 +18,7 @@ ) from ..kast.outer import KRule from ..konvert import krule_to_kore -from ..kore.rpc import KoreClient, KoreServer, SatResult, StopReason, UnknownResult, UnsatResult +from ..kore.rpc import SatResult, StopReason, UnknownResult, UnsatResult from ..kore.syntax import Import, Module from ..ktool.kprove import KoreExecLogFormat from ..prelude import k @@ -35,10 +35,9 @@ from ..kast import KInner from ..kast.outer import KClaim - from ..kore.rpc import LogEntry + from ..kore.rpc import KoreServerBase, LogEntry from ..kore.syntax import Sentence from ..ktool.kprint import KPrint - from ..utils import BugReport from .kcfg import NodeIdLike @@ -48,47 +47,29 @@ class KCFGExplore(ContextManager['KCFGExplore']): kprint: KPrint id: str - _port: int | None - _kore_rpc_command: str | Iterable[str] - _smt_timeout: int | None - _smt_retry_limit: int | None - _bug_report: BugReport | None - - _kore_servers: list[KoreServer] = [] - _kore_clients: list[KoreClient] = [] - _rpc_closed: bool + _trace_rewrites: bool - _max_clients: int + + _kore_server: KoreServerBase def __init__( self, + kore_server: KoreServerBase, kprint: KPrint, *, id: str | None = None, - port: int | None = None, - kore_rpc_command: str | Iterable[str] = 'kore-rpc', - smt_timeout: int | None = None, - smt_retry_limit: int | None = None, - bug_report: BugReport | None = None, haskell_log_format: KoreExecLogFormat = KoreExecLogFormat.ONELINE, haskell_log_entries: Iterable[str] = (), log_axioms_file: Path | None = None, trace_rewrites: bool = False, - max_clients: int = 1, ): + self._kore_server = kore_server self.kprint = kprint self.id = id if id is not None else 'NO ID' - self._port = port - self._kore_rpc_command = kore_rpc_command - self._smt_timeout = smt_timeout - self._smt_retry_limit = smt_retry_limit - self._bug_report = bug_report self._haskell_log_format = haskell_log_format self._haskell_log_entries = haskell_log_entries self._log_axioms_file = log_axioms_file - self._rpc_closed = False self._trace_rewrites = trace_rewrites - self._max_clients = max_clients def __enter__(self) -> KCFGExplore: return self @@ -96,87 +77,8 @@ def __enter__(self) -> KCFGExplore: def __exit__(self, *args: Any) -> None: self.close() - @property - def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: - if self._rpc_closed: - raise ValueError('RPC server already closed!') - curr_server = self._curr_server() # need interim because of lock - if curr_server is not None: - (server, client) = curr_server - elif len(self._kore_clients) < self._max_clients: - (server, client) = self._new_server() - client._lock.acquire() - self._kore_servers.append(server) - self._kore_clients.append(client) - else: - (server, client) = self._next_available_server() - return (server, client) - - @property - def _kore_rpcs(self) -> list[tuple[KoreServer, KoreClient]]: - if self._rpc_closed: - raise ValueError('RPC server already closed!') - res = self._all_servers() - new_servers = [] - while len(self._kore_servers) < self._max_clients: - (server, client) = self._new_server() - client._lock.acquire() - self._kore_servers.append(server) - self._kore_clients.append(client) - new_servers.append((server, client)) - return res + new_servers - - def _curr_server(self) -> tuple[KoreServer, KoreClient] | None: - i, client = next( - ( - (i, client) - for i, client in enumerate(self._kore_clients) - if self._kore_clients[i]._lock.acquire(blocking=False) - ), - (0, None), - ) - if client is not None: - return (self._kore_servers[i], self._kore_clients[i]) - return None - - def _next_available_server(self) -> tuple[KoreServer, KoreClient]: - i = 0 - while True: - if self._kore_clients[i]._lock.acquire(timeout=5): - return (self._kore_servers[i], self._kore_clients[i]) - i = (i + 1) % len(self._kore_clients) - - def _all_servers(self) -> list[tuple[KoreServer, KoreClient]]: - acquired: list[tuple[KoreServer, KoreClient]] = [] - while len(acquired) < len(self._kore_servers): - acquired.append(self._next_available_server()) - return acquired - - # TODO: don't use the same defined port for the KoreServer but a new one - def _new_server(self) -> tuple[KoreServer, KoreClient]: - server = KoreServer( - self.kprint.definition_dir, - self.kprint.main_module, - port=self._port, - bug_report=self._bug_report, - command=self._kore_rpc_command, - smt_timeout=self._smt_timeout, - smt_retry_limit=self._smt_retry_limit, - haskell_log_format=self._haskell_log_format, - haskell_log_entries=self._haskell_log_entries, - log_axioms_file=self._log_axioms_file, - ) - client = KoreClient('localhost', server._port, bug_report=self._bug_report) - return (server, client) - def close(self) -> None: - self._rpc_closed = True - while self._kore_servers: - server = self._kore_servers.pop() - server.close() - while self._kore_clients: - client = self._kore_clients.pop() - client.close() + self.close() def cterm_execute( self, @@ -188,7 +90,7 @@ def cterm_execute( ) -> tuple[int, CTerm, list[CTerm], tuple[LogEntry, ...]]: _LOGGER.debug(f'Executing: {cterm}') kore = self.kprint.kast_to_kore(cterm.kast, GENERATED_TOP_CELL) - _, kore_client = self._kore_rpc + _, kore_client = self._kore_server._kore_rpc er = kore_client.execute( kore, max_depth=depth, @@ -216,7 +118,7 @@ def cterm_execute( def cterm_simplify(self, cterm: CTerm) -> tuple[KInner, tuple[LogEntry, ...]]: _LOGGER.debug(f'Simplifying: {cterm}') kore = self.kprint.kast_to_kore(cterm.kast, GENERATED_TOP_CELL) - _, kore_client = self._kore_rpc + _, kore_client = self._kore_server._kore_rpc kore_simplified, logs = kore_client.simplify(kore) kast_simplified = self.kprint.kore_to_kast(kore_simplified) return kast_simplified, logs @@ -224,7 +126,7 @@ def cterm_simplify(self, cterm: CTerm) -> tuple[KInner, tuple[LogEntry, ...]]: def kast_simplify(self, kast: KInner) -> tuple[KInner, tuple[LogEntry, ...]]: _LOGGER.debug(f'Simplifying: {kast}') kore = self.kprint.kast_to_kore(kast, GENERATED_TOP_CELL) - _, kore_client = self._kore_rpc + _, kore_client = self._kore_server._kore_rpc kore_simplified, logs = kore_client.simplify(kore) kast_simplified = self.kprint.kore_to_kast(kore_simplified) return kast_simplified, logs @@ -232,7 +134,7 @@ def kast_simplify(self, kast: KInner) -> tuple[KInner, tuple[LogEntry, ...]]: def cterm_get_model(self, cterm: CTerm, module_name: str | None = None) -> Subst | None: _LOGGER.info(f'Getting model: {cterm}') kore = self.kprint.kast_to_kore(cterm.kast, GENERATED_TOP_CELL) - _, kore_client = self._kore_rpc + _, kore_client = self._kore_server._kore_rpc result = kore_client.get_model(kore, module_name=module_name) if type(result) is UnknownResult: _LOGGER.debug('Result is Unknown') @@ -270,7 +172,7 @@ def cterm_implies( _consequent = KApply(KLabel('#Exists', [GENERATED_TOP_CELL]), [KVariable(uc), _consequent]) antecedent_kore = self.kprint.kast_to_kore(antecedent.kast, GENERATED_TOP_CELL) consequent_kore = self.kprint.kast_to_kore(_consequent, GENERATED_TOP_CELL) - _, kore_client = self._kore_rpc + _, kore_client = self._kore_server._kore_rpc result = kore_client.implies(antecedent_kore, consequent_kore) if not result.satisfiable: if result.substitution is not None: @@ -379,7 +281,7 @@ def cterm_assume_defined(self, cterm: CTerm) -> CTerm: _LOGGER.debug(f'Computing definedness condition for: {cterm}') kast = KApply(KLabel('#Ceil', [GENERATED_TOP_CELL, GENERATED_TOP_CELL]), [cterm.config]) kore = self.kprint.kast_to_kore(kast, GENERATED_TOP_CELL) - _, kore_client = self._kore_rpc + _, kore_client = self._kore_server._kore_rpc kore_simplified, _logs = kore_client.simplify(kore) kast_simplified = self.kprint.kore_to_kast(kore_simplified) _LOGGER.debug(f'Definedness condition computed: {kast_simplified}') @@ -552,7 +454,7 @@ def add_dependencies_module( for c in dependencies ] kore_axioms: list[Sentence] = [krule_to_kore(self.kprint.kompiled_kore, r) for r in kast_rules] - for _, kore_client in self._kore_rpcs: + for _, kore_client in self._kore_server._kore_rpcs: sentences: list[Sentence] = [Import(module_name=old_module_name, attrs=())] sentences = sentences + kore_axioms m = Module(name=new_module_name, sentences=sentences) diff --git a/src/pyk/kore/rpc.py b/src/pyk/kore/rpc.py index a56a9e3fb..6a2197a4f 100644 --- a/src/pyk/kore/rpc.py +++ b/src/pyk/kore/rpc.py @@ -24,6 +24,7 @@ from collections.abc import Iterable, Mapping from typing import Any, ClassVar, Final, TextIO, TypeVar + from ..ktool.kprint import KPrint from ..utils import BugReport from .syntax import Module @@ -529,8 +530,6 @@ class KoreClient(ContextManager['KoreClient']): _client: JsonRpcClient - _lock: threading.Lock - def __init__(self, host: str, port: int, *, timeout: int | None = None, bug_report: BugReport | None = None): self._client = JsonRpcClient(host, port, timeout=timeout, bug_report=bug_report) self._lock = threading.Lock() @@ -545,18 +544,12 @@ def close(self) -> None: self._client.close() def _try_release(self) -> None: - try: - self._lock.release() - except RuntimeError: - pass + ... def _request(self, method: str, **params: Any) -> dict[str, Any]: try: - res = self._client.request(method, **params) - self._try_release() - return res + return self._client.request(method, **params) except JsonRpcError as err: - self._try_release() assert err.code not in {-32601, -32602}, 'Malformed Kore-RPC request' raise KoreClientError(message=err.message, code=err.code, data=err.data) from err @@ -651,6 +644,30 @@ def add_module(self, module: Module) -> None: assert result == [] +class ParKoreClient(KoreClient): + _lock: threading.Lock + + def __init__(self, host: str, port: int, *, timeout: int | None = None, bug_report: BugReport | None = None): + super().__init__(host, port, timeout=timeout, bug_report=bug_report) + self._lock = threading.Lock() + + def _try_release(self) -> None: + try: + self._lock.release() + except RuntimeError: + pass + + def _request(self, method: str, **params: Any) -> dict[str, Any]: + try: + res = self._client.request(method, **params) + self._try_release() + return res + except JsonRpcError as err: + self._try_release() + assert err.code not in {-32601, -32602}, 'Malformed Kore-RPC request' + raise KoreClientError(message=err.message, code=err.code, data=err.data) from err + + class KoreServer(ContextManager['KoreServer']): _proc: Popen _pid: int @@ -757,3 +774,212 @@ def close(self) -> None: self._proc.send_signal(SIGINT) self._proc.wait() _LOGGER.info(f'KoreServer stopped: {self.host}:{self.port}, pid={self.pid}') + + +class KoreServerBase(ABC): + kprint: KPrint + _port: int | None + _kore_rpc_command: str | Iterable[str] + _smt_timeout: int | None + _smt_retry_limit: int | None + _bug_report: BugReport | None + + _rpc_closed: bool + + def __init__( + self, + kprint: KPrint, + port: int | None = None, + kore_rpc_command: str | Iterable[str] = 'kore-rpc', + smt_timeout: int | None = None, + smt_retry_limit: int | None = None, + bug_report: BugReport | None = None, + haskell_log_format: KoreExecLogFormat = KoreExecLogFormat.ONELINE, + haskell_log_entries: Iterable[str] = (), + log_axioms_file: Path | None = None, + ): + self.kprint = kprint + self._port = port + self._kore_rpc_command = kore_rpc_command + self._smt_timeout = smt_timeout + self._smt_retry_limit = smt_retry_limit + self._bug_report = bug_report + self._haskell_log_format = haskell_log_format + self._haskell_log_entries = haskell_log_entries + self._log_axioms_file = log_axioms_file + self._rpc_closed = False + + @property + @abstractmethod + def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: + ... + + @property + @abstractmethod + def _kore_rpcs(self) -> list[tuple[KoreServer, KoreClient]]: + ... + + def __exit__(self, *args: Any) -> None: + self.close() + + @abstractmethod + def close(self) -> None: + ... + + +class SingleKoreServer(KoreServerBase): + _kore_server: KoreServer | None = None + _kore_client: KoreClient | None = None + + @property + def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: + if self._rpc_closed: + raise ValueError('RPC server already closed!') + print("server") + if not self._kore_server: + print("no server") + self._kore_server = KoreServer( + self.kprint.definition_dir, + self.kprint.main_module, + port=self._port, + bug_report=self._bug_report, + command=self._kore_rpc_command, + smt_timeout=self._smt_timeout, + smt_retry_limit=self._smt_retry_limit, + haskell_log_format=self._haskell_log_format, + haskell_log_entries=self._haskell_log_entries, + log_axioms_file=self._log_axioms_file, + ) + print("client") + if not self._kore_client: + print("no client") + self._kore_client = KoreClient('localhost', self._kore_server._port, bug_report=self._bug_report) + return (self._kore_server, self._kore_client) + + @property + def _kore_rpcs(self) -> list[tuple[KoreServer, KoreClient]]: + return [self._kore_rpc] + + def close(self) -> None: + self._rpc_closed = True + if self._kore_server is not None: + self._kore_server.close() + self._kore_server = None + if self._kore_client is not None: + self._kore_client.close() + self._kore_client = None + + +class KoreServerPool(KoreServerBase): + _kore_server: list[KoreServer] = [] + _kore_client: list[KoreClient] = [] + + _max_clients: int + + def __init__( + self, + kprint: KPrint, + port: int | None = None, + kore_rpc_command: str | Iterable[str] = 'kore-rpc', + smt_timeout: int | None = None, + smt_retry_limit: int | None = None, + bug_report: BugReport | None = None, + haskell_log_format: KoreExecLogFormat = KoreExecLogFormat.ONELINE, + haskell_log_entries: Iterable[str] = (), + log_axioms_file: Path | None = None, + max_clients: int = 1, + ): + super().__init__( + kprint, + port, + kore_rpc_command, + smt_timeout, + smt_retry_limit, + bug_report, + haskell_log_format, + haskell_log_entries, + log_axioms_file, + ) + self._max_clients = max_clients + + @property + def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: + if self._rpc_closed: + raise ValueError('RPC server already closed!') + curr_server = self._curr_server() # need interim because of lock + if curr_server is not None: + (server, client) = curr_server + elif len(self._kore_client) < self._max_clients: + (server, client) = self._new_server() + client._lock.acquire() + self._kore_server.append(server) + self._kore_client.append(client) + else: + (server, client) = self._next_available_server() + return (server, client) + + @property + def _kore_rpcs(self) -> list[tuple[KoreServer, KoreClient]]: + if self._rpc_closed: + raise ValueError('RPC server already closed!') + res = self._all_servers() + new_servers = [] + while len(self._kore_server) < self._max_clients: + (server, client) = self._new_server() + client._lock.acquire() + self._kore_server.append(server) + self._kore_client.append(client) + new_servers.append((server, client)) + return res + new_servers + + def _curr_server(self) -> tuple[KoreServer, KoreClient] | None: + i, client = next( + ( + (i, client) + for i, client in enumerate(self._kore_client) + if self._kore_client[i]._lock.acquire(blocking=False) + ), + (0, None), + ) + if client is not None: + return (self._kore_server[i], self._kore_client[i]) + return None + + def _next_available_server(self) -> tuple[KoreServer, KoreClient]: + i = 0 + while True: + if self._kore_client[i]._lock.acquire(timeout=5): + return (self._kore_server[i], self._kore_client[i]) + i = (i + 1) % len(self._kore_client) + + def _all_servers(self) -> list[tuple[KoreServer, KoreClient]]: + acquired: list[tuple[KoreServer, KoreClient]] = [] + while len(acquired) < len(self._kore_server): + acquired.append(self._next_available_server()) + return acquired + + # TODO: don't use the same defined port for the KoreServer but a new one + def _new_server(self) -> tuple[KoreServer, KoreClient]: + server = KoreServer( + self.kprint.definition_dir, + self.kprint.main_module, + port=self._port, + bug_report=self._bug_report, + command=self._kore_rpc_command, + smt_timeout=self._smt_timeout, + smt_retry_limit=self._smt_retry_limit, + haskell_log_format=self._haskell_log_format, + haskell_log_entries=self._haskell_log_entries, + log_axioms_file=self._log_axioms_file, + ) + client = ParKoreClient('localhost', server._port, bug_report=self._bug_report) + return (server, client) + + def close(self) -> None: + self._rpc_closed = True + while self._kore_server: + server = self._kore_server.pop() + server.close() + while self._kore_client: + client = self._kore_client.pop() + client.close() diff --git a/src/pyk/proof/proof.py b/src/pyk/proof/proof.py index 4f5456c48..97052d6fa 100644 --- a/src/pyk/proof/proof.py +++ b/src/pyk/proof/proof.py @@ -67,6 +67,7 @@ def write_proof(self, subproofs: bool = False) -> None: proof_json = json.dumps(self.dict) proof_path.write_text(proof_json) _LOGGER.info(f'Updated proof file {self.id}: {proof_path}') + print("aft") if subproofs: for sp in self.subproofs: sp.write_proof(subproofs=subproofs) diff --git a/src/pyk/proof/reachability.py b/src/pyk/proof/reachability.py index dbf3f4f81..5cb5ed8de 100644 --- a/src/pyk/proof/reachability.py +++ b/src/pyk/proof/reachability.py @@ -516,7 +516,9 @@ def _advance_from_node(node: NodeIdLike) -> None: ) while self.proof.pending: + print("w") self.proof.write_proof() + print("z") if max_iterations is not None and max_iterations <= iterations: _LOGGER.warning(f'Reached iteration bound {self.proof.id}: {max_iterations}') diff --git a/src/pyk/testing/_kompiler.py b/src/pyk/testing/_kompiler.py index e6da48281..1821da996 100644 --- a/src/pyk/testing/_kompiler.py +++ b/src/pyk/testing/_kompiler.py @@ -8,7 +8,7 @@ from ..kcfg import KCFGExplore from ..kllvm.compiler import compile_runtime from ..kllvm.importer import import_runtime -from ..kore.rpc import KoreClient, KoreServer +from ..kore.rpc import KoreClient, KoreServer, SingleKoreServer from ..ktool.kompile import Kompile from ..ktool.kprint import KPrint from ..ktool.kprove import KProve @@ -128,9 +128,10 @@ def _update_symbol_table(symbol_table: SymbolTable) -> None: class KCFGExploreTest(KProveTest): @pytest.fixture def kcfg_explore(self, kprove: KProve) -> Iterator[KCFGExplore]: + server = SingleKoreServer(kprove) with KCFGExplore( + server, kprove, - bug_report=kprove._bug_report, ) as kcfg_explore: yield kcfg_explore From 6acd860df58594d3456f675bec49cb46f5521073 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Mon, 10 Jul 2023 09:48:11 +0000 Subject: [PATCH 19/20] format --- src/pyk/kore/rpc.py | 4 ---- src/pyk/proof/proof.py | 1 - src/pyk/proof/reachability.py | 2 -- 3 files changed, 7 deletions(-) diff --git a/src/pyk/kore/rpc.py b/src/pyk/kore/rpc.py index 6a2197a4f..2d26927ee 100644 --- a/src/pyk/kore/rpc.py +++ b/src/pyk/kore/rpc.py @@ -835,9 +835,7 @@ class SingleKoreServer(KoreServerBase): def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: if self._rpc_closed: raise ValueError('RPC server already closed!') - print("server") if not self._kore_server: - print("no server") self._kore_server = KoreServer( self.kprint.definition_dir, self.kprint.main_module, @@ -850,9 +848,7 @@ def _kore_rpc(self) -> tuple[KoreServer, KoreClient]: haskell_log_entries=self._haskell_log_entries, log_axioms_file=self._log_axioms_file, ) - print("client") if not self._kore_client: - print("no client") self._kore_client = KoreClient('localhost', self._kore_server._port, bug_report=self._bug_report) return (self._kore_server, self._kore_client) diff --git a/src/pyk/proof/proof.py b/src/pyk/proof/proof.py index 97052d6fa..4f5456c48 100644 --- a/src/pyk/proof/proof.py +++ b/src/pyk/proof/proof.py @@ -67,7 +67,6 @@ def write_proof(self, subproofs: bool = False) -> None: proof_json = json.dumps(self.dict) proof_path.write_text(proof_json) _LOGGER.info(f'Updated proof file {self.id}: {proof_path}') - print("aft") if subproofs: for sp in self.subproofs: sp.write_proof(subproofs=subproofs) diff --git a/src/pyk/proof/reachability.py b/src/pyk/proof/reachability.py index 5cb5ed8de..dbf3f4f81 100644 --- a/src/pyk/proof/reachability.py +++ b/src/pyk/proof/reachability.py @@ -516,9 +516,7 @@ def _advance_from_node(node: NodeIdLike) -> None: ) while self.proof.pending: - print("w") self.proof.write_proof() - print("z") if max_iterations is not None and max_iterations <= iterations: _LOGGER.warning(f'Reached iteration bound {self.proof.id}: {max_iterations}') From fb6e25e87797dfa17586267efccd08b8bcc7d68e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Guyot?= Date: Mon, 10 Jul 2023 09:51:14 +0000 Subject: [PATCH 20/20] write bug_report back in test --- src/pyk/testing/_kompiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pyk/testing/_kompiler.py b/src/pyk/testing/_kompiler.py index 1821da996..d0859f65e 100644 --- a/src/pyk/testing/_kompiler.py +++ b/src/pyk/testing/_kompiler.py @@ -128,7 +128,7 @@ def _update_symbol_table(symbol_table: SymbolTable) -> None: class KCFGExploreTest(KProveTest): @pytest.fixture def kcfg_explore(self, kprove: KProve) -> Iterator[KCFGExplore]: - server = SingleKoreServer(kprove) + server = SingleKoreServer(kprove, bug_report=kprove._bug_report) with KCFGExplore( server, kprove,