diff --git a/src/pip/_internal/cli/base_command.py b/src/pip/_internal/cli/base_command.py index 78b96bb7070..0774f26081f 100644 --- a/src/pip/_internal/cli/base_command.py +++ b/src/pip/_internal/cli/base_command.py @@ -168,7 +168,7 @@ def exc_logging_wrapper(*args: Any) -> int: assert isinstance(status, int) return status except DiagnosticPipError as exc: - logger.error("[present-diagnostic] %s", exc) + logger.error("[present-rich] %s", exc) logger.debug("Exception information:", exc_info=True) return ERROR diff --git a/src/pip/_internal/self_outdated_check.py b/src/pip/_internal/self_outdated_check.py index 7300e0ea4c0..ad62dd27b00 100644 --- a/src/pip/_internal/self_outdated_check.py +++ b/src/pip/_internal/self_outdated_check.py @@ -1,23 +1,34 @@ import datetime +import functools import hashlib import json import logging import optparse import os.path import sys -from typing import Any, Dict +from dataclasses import dataclass +from typing import Any, Callable, Dict, Optional from pip._vendor.packaging.version import parse as parse_version +from pip._vendor.rich.console import Group +from pip._vendor.rich.markup import escape +from pip._vendor.rich.text import Text from pip._internal.index.collector import LinkCollector from pip._internal.index.package_finder import PackageFinder from pip._internal.metadata import get_default_environment +from pip._internal.metadata.base import DistributionVersion from pip._internal.models.selection_prefs import SelectionPreferences from pip._internal.network.session import PipSession +from pip._internal.utils.compat import WINDOWS +from pip._internal.utils.entrypoints import ( + get_best_invocation_for_this_pip, + get_best_invocation_for_this_python, +) from pip._internal.utils.filesystem import adjacent_tmp_file, check_path_owner, replace from pip._internal.utils.misc import ensure_dir -SELFCHECK_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ" +_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ" logger = logging.getLogger(__name__) @@ -31,17 +42,17 @@ def _get_statefile_name(key: str) -> str: class SelfCheckState: def __init__(self, cache_dir: str) -> None: - self.state: Dict[str, Any] = {} - self.statefile_path = None + self._state: Dict[str, Any] = {} + self._statefile_path = None # Try to load the existing state if cache_dir: - self.statefile_path = os.path.join( + self._statefile_path = os.path.join( cache_dir, "selfcheck", _get_statefile_name(self.key) ) try: - with open(self.statefile_path, encoding="utf-8") as statefile: - self.state = json.load(statefile) + with open(self._statefile_path, encoding="utf-8") as statefile: + self._state = json.load(statefile) except (OSError, ValueError, KeyError): # Explicitly suppressing exceptions, since we don't want to # error out if the cache file is invalid. @@ -51,41 +62,87 @@ def __init__(self, cache_dir: str) -> None: def key(self) -> str: return sys.prefix - def save(self, pypi_version: str, current_time: datetime.datetime) -> None: + def get(self, current_time: datetime.datetime) -> Optional[str]: + """Check if we have a not-outdated version loaded already.""" + if not self._state: + return None + + if "last_check" not in self._state: + return None + + if "pypi_version" not in self._state: + return None + + seven_days_in_seconds = 7 * 24 * 60 * 60 + + # Determine if we need to refresh the state + last_check = datetime.datetime.strptime(self._state["last_check"], _DATE_FMT) + seconds_since_last_check = (current_time - last_check).total_seconds() + if seconds_since_last_check > seven_days_in_seconds: + return None + + return self._state["pypi_version"] + + def set(self, pypi_version: str, current_time: datetime.datetime) -> None: # If we do not have a path to cache in, don't bother saving. - if not self.statefile_path: + if not self._statefile_path: return # Check to make sure that we own the directory - if not check_path_owner(os.path.dirname(self.statefile_path)): + if not check_path_owner(os.path.dirname(self._statefile_path)): return # Now that we've ensured the directory is owned by this user, we'll go # ahead and make sure that all our directories are created. - ensure_dir(os.path.dirname(self.statefile_path)) + ensure_dir(os.path.dirname(self._statefile_path)) state = { # Include the key so it's easy to tell which pip wrote the # file. "key": self.key, - "last_check": current_time.strftime(SELFCHECK_DATE_FMT), + "last_check": current_time.strftime(_DATE_FMT), "pypi_version": pypi_version, } text = json.dumps(state, sort_keys=True, separators=(",", ":")) - with adjacent_tmp_file(self.statefile_path) as f: + with adjacent_tmp_file(self._statefile_path) as f: f.write(text.encode()) try: # Since we have a prefix-specific state file, we can just # overwrite whatever is there, no need to check. - replace(f.name, self.statefile_path) + replace(f.name, self._statefile_path) except OSError: # Best effort. pass +@dataclass +class UpgradePrompt: + old: str + new: str + + def __rich__(self) -> Group: + if WINDOWS: + pip_cmd = f"{get_best_invocation_for_this_python()} -m pip" + else: + pip_cmd = get_best_invocation_for_this_pip() + + notice = "[bold][[reset][blue]notice[reset][bold]][reset]" + return Group( + Text(), + Text.from_markup( + f"{notice} A new release of pip available: " + f"[red]{self.old}[reset] -> [green]{self.new}[reset]" + ), + Text.from_markup( + f"{notice} To update, run: " + f"[green]{escape(pip_cmd)} install --upgrade pip" + ), + ) + + def was_installed_by_pip(pkg: str) -> bool: """Checks whether pkg was installed by pip @@ -96,6 +153,66 @@ def was_installed_by_pip(pkg: str) -> bool: return dist is not None and "pip" == dist.installer +def _get_current_remote_pip_version( + session: PipSession, options: optparse.Values +) -> str: + # Lets use PackageFinder to see what the latest pip version is + link_collector = LinkCollector.create( + session, + options=options, + suppress_no_index=True, + ) + + # Pass allow_yanked=False so we don't suggest upgrading to a + # yanked version. + selection_prefs = SelectionPreferences( + allow_yanked=False, + allow_all_prereleases=False, # Explicitly set to False + ) + + finder = PackageFinder.create( + link_collector=link_collector, + selection_prefs=selection_prefs, + use_deprecated_html5lib=("html5lib" in options.deprecated_features_enabled), + ) + best_candidate = finder.find_best_candidate("pip").best_candidate + if best_candidate is None: + return + + return str(best_candidate.version) + + +def _self_version_check_logic( + *, + state: SelfCheckState, + current_time: datetime.datetime, + local_version: DistributionVersion, + get_remote_version: Callable[[], str], +) -> Optional[UpgradePrompt]: + remote_version_str = state.get(current_time) + if remote_version_str is None: + remote_version_str = get_remote_version() + state.set(remote_version_str, current_time) + + remote_version = parse_version(remote_version_str) + logger.debug("Remote version of pip: %s", remote_version) + logger.debug("Local version of pip: %s", local_version) + + pip_installed_by_pip = was_installed_by_pip("pip") + logger.debug("Was pip installed by pip? %s", pip_installed_by_pip) + if not pip_installed_by_pip: + return None # Only suggest upgrade if pip is installed by pip. + + local_version_is_older = ( + local_version < remote_version + and local_version.base_version != remote_version.base_version + ) + if local_version_is_older: + return UpgradePrompt(old=str(local_version), new=remote_version_str) + + return None + + def pip_self_version_check(session: PipSession, options: optparse.Values) -> None: """Check for an update for pip. @@ -107,83 +224,17 @@ def pip_self_version_check(session: PipSession, options: optparse.Values) -> Non if not installed_dist: return - pip_version = installed_dist.version - pypi_version = None - try: - state = SelfCheckState(cache_dir=options.cache_dir) - - current_time = datetime.datetime.utcnow() - # Determine if we need to refresh the state - if "last_check" in state.state and "pypi_version" in state.state: - last_check = datetime.datetime.strptime( - state.state["last_check"], SELFCHECK_DATE_FMT - ) - if (current_time - last_check).total_seconds() < 7 * 24 * 60 * 60: - pypi_version = state.state["pypi_version"] - - # Refresh the version if we need to or just see if we need to warn - if pypi_version is None: - # Lets use PackageFinder to see what the latest pip version is - link_collector = LinkCollector.create( - session, - options=options, - suppress_no_index=True, - ) - - # Pass allow_yanked=False so we don't suggest upgrading to a - # yanked version. - selection_prefs = SelectionPreferences( - allow_yanked=False, - allow_all_prereleases=False, # Explicitly set to False - ) - - finder = PackageFinder.create( - link_collector=link_collector, - selection_prefs=selection_prefs, - use_deprecated_html5lib=( - "html5lib" in options.deprecated_features_enabled - ), - ) - best_candidate = finder.find_best_candidate("pip").best_candidate - if best_candidate is None: - return - pypi_version = str(best_candidate.version) - - # save that we've performed a check - state.save(pypi_version, current_time) - - remote_version = parse_version(pypi_version) - - local_version_is_older = ( - pip_version < remote_version - and pip_version.base_version != remote_version.base_version - and was_installed_by_pip("pip") - ) - - # Determine if our pypi_version is older - if not local_version_is_older: - return - - # We cannot tell how the current pip is available in the current - # command context, so be pragmatic here and suggest the command - # that's always available. This does not accommodate spaces in - # `sys.executable` on purpose as it is not possible to do it - # correctly without knowing the user's shell. Thus, - # it won't be done until possible through the standard library. - # Do not be tempted to use the undocumented subprocess.list2cmdline. - # It is considered an internal implementation detail for a reason. - pip_cmd = f"{sys.executable} -m pip" - logger.warning( - "You are using pip version %s; however, version %s is " - "available.\nYou should consider upgrading via the " - "'%s install --upgrade pip' command.", - pip_version, - pypi_version, - pip_cmd, + upgrade_prompt = _self_version_check_logic( + state=SelfCheckState(cache_dir=options.cache_dir), + current_time=datetime.datetime.utcnow(), + local_version=installed_dist.version, + get_remote_version=functools.partial( + _get_current_remote_pip_version, session, options + ), ) + if upgrade_prompt is not None: + logger.info("[present-rich] %s", upgrade_prompt) except Exception: - logger.debug( - "There was an error checking the latest version of pip", - exc_info=True, - ) + logger.warning("There was an error checking the latest version of pip.") + logger.debug("See below for error", exc_info=True) diff --git a/src/pip/_internal/utils/entrypoints.py b/src/pip/_internal/utils/entrypoints.py index 1504a12916b..f292c64045b 100644 --- a/src/pip/_internal/utils/entrypoints.py +++ b/src/pip/_internal/utils/entrypoints.py @@ -1,7 +1,23 @@ +import itertools +import os +import shutil import sys from typing import List, Optional from pip._internal.cli.main import main +from pip._internal.utils.compat import WINDOWS + +_EXECUTABLE_NAMES = [ + "pip", + f"pip{sys.version_info.major}", + f"pip{sys.version_info.major}.{sys.version_info.minor}", +] +if WINDOWS: + _allowed_extensions = {"", ".exe"} + _EXECUTABLE_NAMES = [ + "".join(parts) + for parts in itertools.product(_EXECUTABLE_NAMES, _allowed_extensions) + ] def _wrapper(args: Optional[List[str]] = None) -> int: @@ -25,3 +41,39 @@ def _wrapper(args: Optional[List[str]] = None) -> int: "running pip directly.\n" ) return main(args) + + +def get_best_invocation_for_this_pip() -> str: + """Try to figure out the best way to invoke pip in the current environment.""" + binary_directory = "Scripts" if WINDOWS else "bin" + binary_prefix = os.path.join(sys.prefix, binary_directory) + + # Try to use pip[X[.Y]] names, if those executables for this environment are + # the first on PATH with that name. + path_parts = os.path.normcase(os.environ.get("PATH", "")).split(os.pathsep) + exe_are_in_PATH = os.path.normcase(binary_prefix) in path_parts + if exe_are_in_PATH: + for exe_name in _EXECUTABLE_NAMES: + found_executable = shutil.which(exe_name) + if found_executable and os.path.samefile( + found_executable, + os.path.join(binary_prefix, exe_name), + ): + return exe_name + + # Use the `-m` invocation, if there's no "nice" invocation. + return f"{get_best_invocation_for_this_python()} -m pip" + + +def get_best_invocation_for_this_python() -> str: + """Try to figure out the best way to invoke the current Python.""" + exe = sys.executable + exe_name = os.path.basename(exe) + + # Try to use the basename, if it's the first executable. + found_executable = shutil.which(exe_name) + if found_executable and os.path.samefile(found_executable, exe): + return exe_name + + # Use the full executable name, because we couldn't find something simpler. + return exe diff --git a/src/pip/_internal/utils/logging.py b/src/pip/_internal/utils/logging.py index e6b7d5dd5d7..d6e991b89ec 100644 --- a/src/pip/_internal/utils/logging.py +++ b/src/pip/_internal/utils/logging.py @@ -21,7 +21,6 @@ from pip._vendor.rich.segment import Segment from pip._vendor.rich.style import Style -from pip._internal.exceptions import DiagnosticPipError from pip._internal.utils._log import VERBOSE, getLogger from pip._internal.utils.compat import WINDOWS from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX @@ -154,12 +153,14 @@ def emit(self, record: logging.LogRecord) -> None: # If we are given a diagnostic error to present, present it with indentation. assert isinstance(record.args, tuple) - if record.msg == "[present-diagnostic] %s" and len(record.args) == 1: - diagnostic_error = record.args[0] - assert isinstance(diagnostic_error, DiagnosticPipError) + if record.msg == "[present-rich] %s" and len(record.args) == 1: + rich_renderable = record.args[0] + assert isinstance( + rich_renderable, ConsoleRenderable + ), f"{rich_renderable} is not rich-console-renderable" renderable: ConsoleRenderable = IndentedRenderable( - diagnostic_error, indent=get_indentation() + rich_renderable, indent=get_indentation() ) else: message = self.format(record) diff --git a/src/pip/_internal/utils/subprocess.py b/src/pip/_internal/utils/subprocess.py index 69c01c7ddcd..cf5bf6be1f6 100644 --- a/src/pip/_internal/utils/subprocess.py +++ b/src/pip/_internal/utils/subprocess.py @@ -209,7 +209,7 @@ def call_subprocess( output_lines=all_output if not showing_subprocess else None, ) if log_failed_cmd: - subprocess_logger.error("[present-diagnostic] %s", error) + subprocess_logger.error("[present-rich] %s", error) subprocess_logger.verbose( "[bold magenta]full command[/]: [blue]%s[/]", escape(format_command_args(cmd)), diff --git a/tests/unit/test_self_check_outdated.py b/tests/unit/test_self_check_outdated.py index d41de249e19..edfc4e82f19 100644 --- a/tests/unit/test_self_check_outdated.py +++ b/tests/unit/test_self_check_outdated.py @@ -1,257 +1,179 @@ import datetime -import functools import json -import os +import logging import sys -from typing import Any, Optional, cast -from unittest import mock +from optparse import Values +from typing import Optional +from unittest.mock import ANY, Mock, patch -import freezegun import pytest -from pip._vendor.packaging.version import parse as parse_version +from freezegun import freeze_time +from pip._vendor.packaging.version import Version from pip._internal import self_outdated_check -from pip._internal.models.candidate import InstallationCandidate -from pip._internal.models.link import Link -from pip._internal.network.session import PipSession -from pip._internal.self_outdated_check import ( - SelfCheckState, - logger, - pip_self_version_check, -) from tests.lib.path import Path -class MockBestCandidateResult: - def __init__(self, best: InstallationCandidate) -> None: - self.best_candidate = best - - -class MockPackageFinder: - - BASE_URL = "https://pypi.org/simple/pip-{0}.tar.gz" - PIP_PROJECT_NAME = "pip" - INSTALLATION_CANDIDATES = [ - InstallationCandidate( - PIP_PROJECT_NAME, - "6.9.0", - Link(BASE_URL.format("6.9.0")), - ), - InstallationCandidate( - PIP_PROJECT_NAME, - "3.3.1", - Link(BASE_URL.format("3.3.1")), +@pytest.mark.parametrize( + ["key", "expected"], + [ + ( + "/hello/world/venv", + "fcd2d5175dd33d5df759ee7b045264230205ef837bf9f582f7c3ada7", ), - InstallationCandidate( - PIP_PROJECT_NAME, - "1.0", - Link(BASE_URL.format("1.0")), + ( + "C:\\Users\\User\\Desktop\\venv", + "902cecc0745b8ecf2509ba473f3556f0ba222fedc6df433acda24aa5", ), - ] - - @classmethod - def create(cls, *args: Any, **kwargs: Any) -> "MockPackageFinder": - return cls() - - def find_best_candidate(self, project_name: str) -> MockBestCandidateResult: - return MockBestCandidateResult(self.INSTALLATION_CANDIDATES[0]) - - -class MockDistribution: - def __init__(self, installer: str, version: str) -> None: - self.installer = installer - self.version = parse_version(version) - - -class MockEnvironment: - def __init__(self, installer: str, installed_version: Optional[str]) -> None: - self.installer = installer - self.installed_version = installed_version - - def get_distribution(self, name: str) -> Optional[MockDistribution]: - if self.installed_version is None: - return None - return MockDistribution(self.installer, self.installed_version) + ], +) +def test_get_statefile_name_known_values(key: str, expected: str) -> None: + assert expected == self_outdated_check._get_statefile_name(key) -def _options() -> mock.Mock: - """Some default options that we pass to - self_outdated_check.pip_self_version_check""" - return mock.Mock( - find_links=[], - index_url="default_url", - extra_index_urls=[], - no_index=False, - pre=False, - cache_dir="", - deprecated_features_enabled=[], +@freeze_time("1970-01-02T11:00:00Z") +@patch("pip._internal.self_outdated_check._self_version_check_logic") +@patch("pip._internal.self_outdated_check.SelfCheckState") +def test_pip_self_version_check_calls_underlying_implementation( + mocked_state: Mock, mocked_function: Mock, tmpdir: Path +) -> None: + # GIVEN + mock_session = Mock() + fake_options = Values(dict(cache_dir=str(tmpdir))) + + # WHEN + self_outdated_check.pip_self_version_check(mock_session, fake_options) + + # THEN + mocked_state.assert_called_once_with(cache_dir=str(tmpdir)) + mocked_function.assert_called_once_with( + state=mocked_state(cache_dir=str(tmpdir)), + current_time=datetime.datetime(1970, 1, 2, 11, 0, 0), + local_version=ANY, + get_remote_version=ANY, ) @pytest.mark.parametrize( [ - "stored_time", - "installed_ver", - "new_ver", - "installer", - "check_if_upgrade_required", - "check_warn_logs", + "installed_version", + "remote_version", + "stored_version", + "installed_by_pip", + "should_show_prompt", ], [ - # Test we return None when installed version is None - ("1970-01-01T10:00:00Z", None, "1.0", "pip", False, False), - # Need an upgrade - upgrade warning should print - ("1970-01-01T10:00:00Z", "1.0", "6.9.0", "pip", True, True), - # Upgrade available, pip installed via rpm - warning should not print - ("1970-01-01T10:00:00Z", "1.0", "6.9.0", "rpm", True, False), - # No upgrade - upgrade warning should not print - ("1970-01-9T10:00:00Z", "6.9.0", "6.9.0", "pip", False, False), + # A newer version available! + ("1.0", "2.0", None, True, True), + # A newer version available, and cached value is new too! + ("1.0", "2.0", "2.0", True, True), + # A newer version available, but was not installed by pip. + ("1.0", "2.0", None, False, False), + # On the latest version already. + ("2.0", "2.0", None, True, False), + # On the latest version already, and cached value matches. + ("2.0", "2.0", "2.0", True, False), + # A newer version available, but cached value is older. + ("1.0", "2.0", "1.0", True, False), ], ) -def test_pip_self_version_check( +def test_core_logic( + installed_version: str, + remote_version: str, + stored_version: Optional[str], + installed_by_pip: bool, + should_show_prompt: bool, + caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch, - stored_time: str, - installed_ver: Optional[str], - new_ver: str, - installer: str, - check_if_upgrade_required: bool, - check_warn_logs: bool, ) -> None: + # GIVEN monkeypatch.setattr( - self_outdated_check, - "get_default_environment", - functools.partial(MockEnvironment, installer, installed_ver), - ) - monkeypatch.setattr( - self_outdated_check, - "PackageFinder", - MockPackageFinder, - ) - monkeypatch.setattr(logger, "warning", mock.Mock()) - monkeypatch.setattr(logger, "debug", mock.Mock()) - - fake_state = mock.Mock( - state={"last_check": stored_time, "pypi_version": installed_ver}, - save=mock.Mock(), + self_outdated_check, "was_installed_by_pip", lambda _: installed_by_pip ) - monkeypatch.setattr(self_outdated_check, "SelfCheckState", lambda **kw: fake_state) - - with freezegun.freeze_time( - "1970-01-09 10:00:00", - ignore=[ - "six.moves", - "pip._vendor.six.moves", - "pip._vendor.requests.packages.urllib3.packages.six.moves", - ], - ): - pip_self_version_check(PipSession(), _options()) - - # See that we saved the correct version - if check_if_upgrade_required: - assert fake_state.save.call_args_list == [ - mock.call(new_ver, datetime.datetime(1970, 1, 9, 10, 00, 00)), - ] - elif installed_ver: - # Make sure no Exceptions - assert not cast(mock.Mock, logger.debug).call_args_list - # See that save was not called - assert fake_state.save.call_args_list == [] + mock_state = Mock() + mock_state.get.return_value = stored_version + fake_time = datetime.datetime(2000, 1, 1, 0, 0, 0) + version_that_should_be_checked = stored_version or remote_version + + # WHEN + with caplog.at_level(logging.DEBUG): + return_value = self_outdated_check._self_version_check_logic( + state=mock_state, + current_time=fake_time, + local_version=Version(installed_version), + get_remote_version=lambda: remote_version, + ) + + # THEN + mock_state.get.assert_called_once_with(fake_time) + assert caplog.messages == [ + f"Remote version of pip: {version_that_should_be_checked}", + f"Local version of pip: {installed_version}", + f"Was pip installed by pip? {installed_by_pip}", + ] - # Ensure we warn the user or not - if check_warn_logs: - assert cast(mock.Mock, logger.warning).call_count == 1 + if stored_version: + mock_state.set.assert_not_called() else: - assert cast(mock.Mock, logger.warning).call_count == 0 - - -statefile_name_case_1 = "fcd2d5175dd33d5df759ee7b045264230205ef837bf9f582f7c3ada7" - -statefile_name_case_2 = "902cecc0745b8ecf2509ba473f3556f0ba222fedc6df433acda24aa5" - - -@pytest.mark.parametrize( - "key,expected", - [ - ("/hello/world/venv", statefile_name_case_1), - ("C:\\Users\\User\\Desktop\\venv", statefile_name_case_2), - ], -) -def test_get_statefile_name_known_values(key: str, expected: str) -> None: - assert expected == self_outdated_check._get_statefile_name(key) - - -def _get_statefile_path(cache_dir: str, key: str) -> str: - return os.path.join( - cache_dir, "selfcheck", self_outdated_check._get_statefile_name(key) - ) - - -def test_self_check_state_no_cache_dir() -> None: - state = SelfCheckState(cache_dir="") - assert state.state == {} - assert state.statefile_path is None - - -def test_self_check_state_key_uses_sys_prefix(monkeypatch: pytest.MonkeyPatch) -> None: - key = "helloworld" - - monkeypatch.setattr(sys, "prefix", key) - state = self_outdated_check.SelfCheckState("") - - assert state.key == key - - -def test_self_check_state_reads_expected_statefile( - monkeypatch: pytest.MonkeyPatch, tmpdir: Path -) -> None: - cache_dir = tmpdir / "cache_dir" - cache_dir.mkdir() - key = "helloworld" - statefile_path = _get_statefile_path(str(cache_dir), key) - - last_check = "1970-01-02T11:00:00Z" - pypi_version = "1.0" - content = { - "key": key, - "last_check": last_check, - "pypi_version": pypi_version, - } - - Path(statefile_path).parent.mkdir() - - with open(statefile_path, "w") as f: - json.dump(content, f) - - monkeypatch.setattr(sys, "prefix", key) - state = self_outdated_check.SelfCheckState(str(cache_dir)) - - assert state.state["last_check"] == last_check - assert state.state["pypi_version"] == pypi_version - - -def test_self_check_state_writes_expected_statefile( - monkeypatch: pytest.MonkeyPatch, tmpdir: Path -) -> None: - cache_dir = tmpdir / "cache_dir" - cache_dir.mkdir() - key = "helloworld" - statefile_path = _get_statefile_path(str(cache_dir), key) - - last_check = datetime.datetime.strptime( - "1970-01-02T11:00:00Z", self_outdated_check.SELFCHECK_DATE_FMT - ) - pypi_version = "1.0" - - monkeypatch.setattr(sys, "prefix", key) - state = self_outdated_check.SelfCheckState(str(cache_dir)) - - state.save(pypi_version, last_check) - with open(statefile_path) as f: - saved = json.load(f) - - expected = { - "key": key, - "last_check": last_check.strftime(self_outdated_check.SELFCHECK_DATE_FMT), - "pypi_version": pypi_version, - } - assert expected == saved + mock_state.set.assert_called_once_with( + version_that_should_be_checked, fake_time + ) + + if not should_show_prompt: + assert return_value is None + return # the remaining assertions are for the other case. + + assert return_value is not None + assert return_value.old == installed_version + assert return_value.new == remote_version + + +class TestSelfCheckState: + def test_no_cache(self) -> None: + # GIVEN / WHEN + state = self_outdated_check.SelfCheckState(cache_dir="") + assert state._statefile_path is None + + def test_reads_expected_statefile(self, tmpdir: Path) -> None: + # GIVEN + cache_dir = tmpdir / "cache_dir" + expected_path = ( + cache_dir + / "selfcheck" + / self_outdated_check._get_statefile_name(sys.prefix) + ) + + cache_dir.mkdir() + (cache_dir / "selfcheck").mkdir() + expected_path.write_text('{"foo": "bar"}') + + # WHEN + state = self_outdated_check.SelfCheckState(cache_dir=str(cache_dir)) + + # THEN + assert state._statefile_path == expected_path + assert state._state == {"foo": "bar"} + + def test_writes_expected_statefile(self, tmpdir: Path) -> None: + # GIVEN + cache_dir = tmpdir / "cache_dir" + cache_dir.mkdir() + expected_path = ( + cache_dir + / "selfcheck" + / self_outdated_check._get_statefile_name(sys.prefix) + ) + + # WHEN + state = self_outdated_check.SelfCheckState(cache_dir=str(cache_dir)) + state.set("1.0.0", datetime.datetime(2000, 1, 1, 0, 0, 0)) + + # THEN + assert state._statefile_path == expected_path + + contents = expected_path.read_text() + assert json.loads(contents) == { + "key": sys.prefix, + "last_check": "2000-01-01T00:00:00Z", + "pypi_version": "1.0.0", + } diff --git a/tests/unit/test_utils_subprocess.py b/tests/unit/test_utils_subprocess.py index ea3a7b26175..a694b717fcb 100644 --- a/tests/unit/test_utils_subprocess.py +++ b/tests/unit/test_utils_subprocess.py @@ -262,7 +262,7 @@ def test_info_logging__subprocess_error( [ # pytest's caplog overrides th formatter, which means that we # won't see the message formatted through our formatters. - ("pip.subprocessor", ERROR, "[present-diagnostic]"), + ("pip.subprocessor", ERROR, "[present-rich]"), ], ) # The spinner should spin three times in this case since the